Hi Haizhou,
Thanks for sharing the use cases!

For use case 1, I think Peter mentioned that the wide table feature might
help with backfilling and updating a single column. However, keeping both
base data and rank data in the same table doesn’t seem low-maintenance,
especially the rankings are recalculated frequently. In my view, if the
engine supports it, using a materialized view with auto-refresh offers a
more ops free way.

For use case 2, Indexing might help mitigate the issue of scattered
deletes, but given that your use case is based on timestamps, I feel a more
effective approach could be leveraging sort order on the timestamp column.
This would help co-locate records that are likely to be deleted together,
reducing the spread of delete files across the table.

Thanks,
Xiaoxuan

On Fri, Jun 13, 2025 at 5:01 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Hi Haizhou Zhao,
>
> Thanks for the detailed use-cases!
>
> Quick question for the Use Case 1: Do you have a primary key to identify
> the updated rows? How wide, complicated is this key? Is the ranking always
> recalculated for the full partition? If so the discussion around the wide
> tables (https://lists.apache.org/thread/h0941sdq9jwrb6sj0pjfjjxov8tx7ov9)
> could be interesting for you, as it might provide a way to update a column
> or column family for a whole table without rewriting the table.
>
> Use Case 2 is a bit more tricky, as it would require us to maintain a
> secondary index for a non primary key column(s)
>
> Thanks, Peter
>
>
> On Fri, Jun 13, 2025, 02:08 Haizhou Zhao <zhaohaizhou940...@gmail.com>
> wrote:
>
>> Hey Xiaoxuan,
>>
>> Want to bring up a couple use cases that might be related to your
>> proposal, but you can tell me whether they are relevant or not from your
>> perspective.
>>
>> Use Case 1: Ranking
>>
>> A user has a table with N+1 columns, the first N columns are the base
>> data (events she gathered), the last column is a ranking number for each
>> row. The ranking is defined within the partition, so the ranking column for
>> all rows are initially NaN, and will be updated when the entire (or close
>> to the entire) partition arrives at a later point.
>>
>> UPDATE tbl SET my_rank = rank_func(col_i, col_j, col_k …) WHERE pk_col_1
>> = pk1, pk_col_2 = pk2 …
>>
>> The above statement is just saying: now that my entire partition has been
>> ingested, please calculate all the rankings for all the rows in this
>> partition and update onto the table.
>>
>> I think the best way to visualize this use case is to think of a typical
>> social media company,  who needs to constantly generate the top N feed/news
>> from the previous day, the past hour, etc. Let’s just say my customer has a
>> use case similar in nature (more or less a typical streaming with fixed
>> window use case).
>>
>> In this case, there are downstream consumers of my table. For example,
>> with ranking info populated, downstream consumers want to further analyze
>> if there is something in common for events that have high ranks across
>> partitions. So query speed on this table matters. And let’s just assume we
>> need to keep up the query performance on this table no matter how
>> frequently we are updating ranking info into this table.
>>
>> Now, the restriction here is I cannot “copy on write”, because the base
>> event data (the first N columns) is too big, “copy on write” would simply
>> be unaffordable. I cannot update my ranking info into my table using
>> “positional deletes”, because writing “positional deletes” is slower than
>> writing “equality deletes”, making my downstream consumers waiting longer
>> to obtain ranking info for the latest partition. Let’s just say my
>> downstream consumers want ranking info to be presented to them in a timely
>> manner.
>>
>> Additional Notes: I guess many folks here might suggest not putting
>> ranking info and the base event data into the same table. Instead, keep the
>> base event data in a base table, and then create a view (or better
>> materialized view) to store base event data + ranking info together. This
>> is more or less my customer’s decision not mine, they do not want to keep a
>> base table, and a view (materialized view) at the same time, because, from
>> their perspective, the ops load for maintaining a base table + a view is
>> significantly higher than maintaining a single table. Also, the ranking
>> here is not "write-once", there is a chance that the ranking will be
>> recalculated for an old partition with ranking already calculated. It might
>> be because the ranking_func/rule changed, or there's an error in previously
>> calculated rank (i.e. due to extremely long latency for some data on
>> ingestion side, the partition is not complete enough to derive the correct
>> ranking info previously).
>>
>> Use Case 2: Retention
>>
>> Now, if every Iceberg table in the world uses a time based partition,
>> then running a retention workflow is as easy as just dropping partitions as
>> time comes. Unfortunately that assumption is not true. So in case of
>> removing outdated rows on an Iceberg table not partitioned by time, such a
>> retention workflow will generate delete files spread around all partitions.
>> And again, sometimes you cannot choose “positional deletes” for such a
>> retention workflow because this will extend the runtime of the retention
>> workflow. Having a long running maintenance workflow (such as retention, or
>> compaction) can be a bad idea if not disastrous, especially in a streaming,
>> low latency world.
>>
>> With that being said, you can easily imagine what will happen next: a
>> retention workflow just finished, the query speed on this non time
>> partitioned table will suffer, until a compaction job kicks in later to
>> reform this table for better read performance.
>>
>>
>> On Tue, Jun 10, 2025 at 4:54 PM Xiaoxuan Li <xiaoxuan.li....@gmail.com>
>> wrote:
>>
>>> Thank you for the thoughtful feedback, Yan, and for bringing up these
>>> important questions.
>>>
>>> > How realistic is the scenario I've described, and what's the
>>> likelihood of encountering it in production environments?
>>>
>>> I don’t have direct visibility into that either, but I’ve seen some
>>> vendors claim they can achieve sub-minute latency and write CDC streams to
>>> Iceberg with thousands of changes per minute. I’ll defer to others who may
>>> have more hands-on experience.
>>>
>>> > and in such situation users should be expected to firstly perform some
>>> other techniques such as compaction or other workload optimizations before
>>> considering adopting this index feature; but meanwhile I think we do want
>>> to make sure for well-maintained tables and valid/common use cases, the new
>>> proposal will not inadvertently creating limitations or bottlenecks.
>>>
>>> Yes, I totally agree. We definitely don’t want to introduce a new
>>> bottleneck while trying to solve an existing limitation. That’s one of the
>>> key reasons I’m proposing a file-level index approach. But as I’ve been
>>> thinking more about it, beyond caching index files on disk and storing
>>> footers in executor memory, we could also consider consolidated indexing at
>>> the manifest level. This would involve aggregating file-level indexes into
>>> the manifest, which could significantly improve efficiency, especially for
>>> handling many small files. We could apply a similar consolidation strategy
>>> to older partitions as well, potentially tying them to a separate
>>> manifest-level index file. Depending on the index size, we can decide
>>> whether to embed the index directly in the manifest or store it as a
>>> standalone file.
>>>
>>>
>>> Thanks Haizhou for your interest!
>>>
>>> > 1. Is this an Iceberg issue, or a Parquet (table format provider)
>>> issue?
>>>
>>> Like Steven mentioned, parquet is a file format. But your point is
>>> valid, Parquet is optimized for sequential scan and lacks efficient
>>> random access capabilities. That’s why we need an index to support fast
>>> lookups. Each file format is designed with different workload patterns, and
>>> that’s how I’m thinking about this problem, by considering the strengths
>>> and limitations of each format for the use case. Hope that makes sense.
>>>
>>>
>>> > I think there is mention of the newly added index files requiring its
>>> own compaction workflow. A severe pain point today for many of our Flink
>>> based ingestion/DA/ETL users is that the compaction workflow takes longer
>>> than the commit interval - the highly frequent data change commit basically
>>> blocks the compaction commit from going into the table for an extremely
>>> long time (theoretically, could be forever).
>>>
>>> Thanks for pointing this out. If we're using file level indexing, the
>>> index files would be compacted alongside data files as part of the regular
>>> compaction workflow, so it shouldn't introduce additional compaction
>>> cycles. I’d love to learn more about your use cases, particularly how your
>>> table is partitioned, commit Interval, avg scale of your table, if you’re
>>> open to sharing. That context would help us understand how the indexing
>>> strategy might fit.
>>>
>>>
>>> Thanks,
>>>
>>> Xiaoxuan
>>>
>>> On Fri, Jun 6, 2025 at 7:34 PM Yan Yan <yyany...@gmail.com> wrote:
>>>
>>>> Thanks Xiaoxuan for the detailed proposal and everyone for the great
>>>> discussion!
>>>>
>>>> It seems to me that it feels more valuable if we can firstly clearly
>>>> define the specific use case we're trying to address, as this would help us
>>>> make more informed decisions about trade-offs between file vs partitioned
>>>> level indexing.
>>>>
>>>> Even if we already set up our goal to be leveraging indexing to replace
>>>> most equality-based deletes to positional deletes for CDC scenarios, I
>>>> think we need to consider the specific characteristics of both the table
>>>> and streaming workload. For instance, in a CDC environment where data
>>>> streams in quick and small batches, one extreme case I could imagine is a
>>>> table could accumulate hundreds of thousands of small files, with records
>>>> distributed randomly, and CDC dumps data in rapid succession leading to
>>>> fast increase in number of files.
>>>>
>>>> In such scenarios, meeting the requirement of "the table should be
>>>> partitioned and sorted by the PK" for efficient file-level indexing might
>>>> not easily be achievable; and the randomized data distribution and the
>>>> sheer volume of files (each with its own index) would require loading a big
>>>> number of index files into driver's memory to determine the correct delete
>>>> positions per data file for upserts. While thread pooling could help
>>>> optimize concurrent file reads and reduce S3 connection overhead, when
>>>> dealing with opening thousands of files or more, it would pose a great
>>>> challenge for the driver to operate well even when the concurrent reading
>>>> capabilities, preloading and caching mechanisms are in use.
>>>>
>>>> On the other hand, I could also see a complicated partitioned/global
>>>> index approach, if not designed or even configured well, could introduce
>>>> complications at write scenario even for straightforward pure insert
>>>> operations, potentially resulting in much more overhead than the file-level
>>>> index proposal aimed at optimizing CDC writes. Additional considerations
>>>> like multi-writer scenarios could further add complexity, as Xiaoxuan
>>>> previously mentioned (Peter/Steven, I think if you wouldn't mind sharing
>>>> more details about the proposal you mentioned earlier that would be great;
>>>> maybe we could have some offline discussion on this).
>>>>
>>>> My general questions would be:
>>>> 1. How realistic is the scenario I've described, and what's the
>>>> likelihood of encountering it in production environments? I personally
>>>> indeed do not have much visibility into this.
>>>> 2. Should this scenario be within the scope of our proposal? I could
>>>> totally understand the argument that a poorly maintained tables would
>>>> naturally incur performance penalties, and in such situation users should
>>>> be expected to firstly perform some other techniques such as compaction or
>>>> other workload optimizations before considering adopting this index
>>>> feature; but meanwhile I think we do want to make sure for well-maintained
>>>> tables and valid/common use cases, the new proposal will not inadvertently
>>>> creating limitations or bottlenecks.
>>>>
>>>> Thanks,
>>>> Yan
>>>>
>>>> On Wed, Jun 4, 2025 at 8:59 PM Steven Wu <stevenz...@gmail.com> wrote:
>>>>
>>>>> Haizhou,
>>>>>
>>>>> 1. it is probably inaccurate to call Parquet a table format provider.
>>>>> Parquet is a just file format. Delete vectors (position deletes) are
>>>>> outside the scope of Parquet files. The nature of equality deletes just
>>>>> make it impossible to read in constant time O(1)
>>>>>
>>>>> 2. The inverted index idea is still in early discussions and not ready
>>>>> to be shared with the broader community. But to your question, it won't
>>>>> make compaction worse (slower). Totally understand the pain point you
>>>>> raised. It should be discussed and covered in whatever proposal.
>>>>>
>>>>> Thanks,
>>>>> Steven
>>>>>
>>>>> On Wed, Jun 4, 2025 at 5:52 PM Haizhou Zhao <
>>>>> zhaohaizhou940...@gmail.com> wrote:
>>>>>
>>>>>> Hey folks,
>>>>>>
>>>>>> Thanks for discussing this interesting topic. I have couple relevant
>>>>>> thoughts while reading through this thread:
>>>>>>
>>>>>> 1. Is this an Iceberg issue, or a Parquet (table format provider)
>>>>>> issue? For example, if Parquet (or other table format provider) provides 
>>>>>> a
>>>>>> mechanism where both query by position and query by equality in a data 
>>>>>> file
>>>>>> take constant time O(1), then would equality deletes still cause the same
>>>>>> kind of pain for Iceberg tables? For the Iceberg community, how do we
>>>>>> define the boundary of responsibility between the Iceberg project and 
>>>>>> other
>>>>>> table format providers like Parquet?
>>>>>>
>>>>>> 2. I think there is mention of the newly added index
>>>>>> files requiring its own compaction workflow. A severe pain point today 
>>>>>> for
>>>>>> many of our Flink based ingestion/DA/ETL users is that the compaction
>>>>>> workflow takes longer than the commit interval - the highly frequent data
>>>>>> change commit basically blocks the compaction commit from going into the
>>>>>> table for an extremely long time (theoretically, could be forever). I
>>>>>> understand our problem here is entirely different from the proposal in 
>>>>>> this
>>>>>> thread, but will the newly added index files push up the compaction
>>>>>> workflow runtime significantly, and thus making an existing issue worse?
>>>>>>
>>>>>> Thanks,
>>>>>> -Haizhou
>>>>>>
>>>>>> On Wed, Jun 4, 2025 at 3:21 PM Xiaoxuan Li <xiaoxuan.li....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Totally agree, supporting mutability on top of immutable storage at
>>>>>>> scale is a non-trivial problem.
>>>>>>> I think the number of index files is ok, we can preload them in
>>>>>>> parallel or cache them on disk. Not sure yet about caching deserialized
>>>>>>> data, that might need some more thought.
>>>>>>>
>>>>>>> Xiaoxuan
>>>>>>>
>>>>>>> On Wed, Jun 4, 2025 at 5:21 AM Péter Váry <
>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>
>>>>>>>> > Our primary strategy for accelerating lookups focuses on
>>>>>>>> optimizing the index file itself, leveraging sorted keys, smaller row 
>>>>>>>> group
>>>>>>>> sizes, and Bloom filters for Parquet files. We’re also exploring custom
>>>>>>>> formats that support more fine-grained skipping.
>>>>>>>>
>>>>>>>> The techniques you mentioned are important, but in blob stores, the
>>>>>>>> number of files that need to be accessed often has a greater impact 
>>>>>>>> than
>>>>>>>> how each file is read.
>>>>>>>>
>>>>>>>> > > but if there are updates around your PK range, then you need to
>>>>>>>> read more index files.
>>>>>>>>
>>>>>>>> > Yes, as the table grows, the index files will scale accordingly.
>>>>>>>>
>>>>>>>> I wanted to point out the following issue: Even if the table size
>>>>>>>> remains constant and only updates occur, the number of files that need 
>>>>>>>> to
>>>>>>>> be accessed continues to grow.
>>>>>>>>
>>>>>>>>
>>>>>>>> > when updating many random keys, it's likely to touch nearly all
>>>>>>>> buckets, which increases the number of index files that must be 
>>>>>>>> scanned.
>>>>>>>> This is exactly why the lookup performance of the index file itself 
>>>>>>>> becomes
>>>>>>>> so critical.
>>>>>>>>
>>>>>>>> Once again, in blob storage systems, file access does not scale
>>>>>>>> efficiently. To address this, we need a strategy that enables result
>>>>>>>> caching. However, caching based on data files becomes ineffective when
>>>>>>>> there are frequent updates. In such cases, we must access every index 
>>>>>>>> file
>>>>>>>> added after the original file was created. While it's possible to 
>>>>>>>> filter
>>>>>>>> out some index files using statistics, this is largely unreliable due 
>>>>>>>> to
>>>>>>>> the inherently random nature of data ingestion.
>>>>>>>>
>>>>>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont: 2025.
>>>>>>>> jún. 4., Sze, 0:45):
>>>>>>>>
>>>>>>>>> Hi Peter,
>>>>>>>>>
>>>>>>>>> > If the table is partitioned and sorted by the PK, we don't
>>>>>>>>> really need to have any index. We can find the data file containing 
>>>>>>>>> the
>>>>>>>>> record based on the Content File statistics, and the RowGroup 
>>>>>>>>> containing
>>>>>>>>> the record based on the Parquet metadata.
>>>>>>>>>
>>>>>>>>> Our primary strategy for accelerating lookups focuses on
>>>>>>>>> optimizing the index file itself, leveraging sorted keys, smaller row 
>>>>>>>>> group
>>>>>>>>> sizes, and Bloom filters for Parquet files. We’re also exploring 
>>>>>>>>> custom
>>>>>>>>> formats that support more fine-grained skipping.
>>>>>>>>>
>>>>>>>>> But I agree, managing indexes is like managing a table. If we
>>>>>>>>> tightly coupled partitioning and file skipping to the base table, it 
>>>>>>>>> could
>>>>>>>>> limit flexibility and broader use cases. By having global indexes, we 
>>>>>>>>> can
>>>>>>>>> decouple those constraints and enable manifests level skipping by 
>>>>>>>>> using a
>>>>>>>>> different partition strategy for indexes. But that also leads us into 
>>>>>>>>> a
>>>>>>>>> kind of circular dependency, do we want to treat the index as a 
>>>>>>>>> table? I'm
>>>>>>>>> happy to continue iterating on this idea.
>>>>>>>>>
>>>>>>>>> > but if there are updates around your PK range, then you need to
>>>>>>>>> read more index files.
>>>>>>>>>
>>>>>>>>> Yes, as the table grows, the index files will scale accordingly.
>>>>>>>>>
>>>>>>>>> > Do I understand correctly that your use case is updating a
>>>>>>>>> single record
>>>>>>>>>
>>>>>>>>> That was just an example, I'm planning to use a workload that
>>>>>>>>> accesses 0.01% of records from a 2-billion-row dataset as the 
>>>>>>>>> experimental
>>>>>>>>> baseline. But yes, when updating many random keys, it's likely to 
>>>>>>>>> touch
>>>>>>>>> nearly all buckets, which increases the number of index files that 
>>>>>>>>> must be
>>>>>>>>> scanned. This is exactly why the lookup performance of the index file
>>>>>>>>> itself becomes so critical.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Xiaoxuan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 3, 2025 at 6:37 AM Péter Váry <
>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Xiaoxuan,
>>>>>>>>>>
>>>>>>>>>> > 2. File-Level Indexing
>>>>>>>>>> > [..]
>>>>>>>>>> > To make this efficient, the table should be partitioned and
>>>>>>>>>> sorted by the PK.
>>>>>>>>>>
>>>>>>>>>> If the table is partitioned and sorted by the PK, we don't really
>>>>>>>>>> need to have any index. We can find the data file containing the 
>>>>>>>>>> record
>>>>>>>>>> based on the Content File statistics, and the RowGroup containing the
>>>>>>>>>> record based on the Parquet metadata.
>>>>>>>>>> If the data is not fully sorted, then we will have multiple
>>>>>>>>>> index files to read. For example, if data is inserted and then 20 
>>>>>>>>>> updates
>>>>>>>>>> occur where the primary key falls within the range of the updated
>>>>>>>>>> records—but our specific key is not among those updated—we still 
>>>>>>>>>> need to
>>>>>>>>>> scan all files to find the current values for the record. There ways
>>>>>>>>>> around to filter out these files, but index file doesn't help here 
>>>>>>>>>> much.
>>>>>>>>>>
>>>>>>>>>> > Index access is naturally distributed as index files are
>>>>>>>>>> co-located with data files during scan.
>>>>>>>>>>
>>>>>>>>>> I don't get this. Sure, you can co-locate the index file with the
>>>>>>>>>> file you are reading, but if there are updates around your PK range, 
>>>>>>>>>> then
>>>>>>>>>> you need to read more index files.
>>>>>>>>>>
>>>>>>>>>> About the use-cases:
>>>>>>>>>>
>>>>>>>>>> Do I understand correctly that your use case is updating a single
>>>>>>>>>> record (like "UPDATE users SET discount='GOLD' WHERE 
>>>>>>>>>> user_id='GUID'"), and
>>>>>>>>>> not like a updating multiple records at once (like "UPDATE users SET
>>>>>>>>>> discount='GOLD' WHERE purchase_value > 1000")?
>>>>>>>>>>
>>>>>>>>>> IMHO, if we have updates for many records, caching is a must.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Peter
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 3, 2025, 04:43 Xiaoxuan Li <xiaoxuan.li....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks, Peter, for bringing these ideas forward! and you also
>>>>>>>>>>> raised a great point about clarifying the goal of indexing. I’ve 
>>>>>>>>>>> been
>>>>>>>>>>> considering it with the intention of eventually enabling fast 
>>>>>>>>>>> upserts
>>>>>>>>>>> through DVs. To support that, we need an index that maps primary 
>>>>>>>>>>> keys to
>>>>>>>>>>> both the data file and the key’s location within it.So the key 
>>>>>>>>>>> lookup speed
>>>>>>>>>>> is crucial here. This will allow us to quickly determine whether a 
>>>>>>>>>>> key
>>>>>>>>>>> already exists and, if so, identify the location it resides in. In 
>>>>>>>>>>> the
>>>>>>>>>>> event no data file contains the key, we can simply append new 
>>>>>>>>>>> records.
>>>>>>>>>>>
>>>>>>>>>>> So there are two primary strategies for implementing indexing,
>>>>>>>>>>> and based on your description, you and Steven’s approach seems to 
>>>>>>>>>>> fit more
>>>>>>>>>>> closely into the first one. Let me know if that’s accurate.
>>>>>>>>>>> 1. *Partitioned (Global) Indexing*
>>>>>>>>>>>
>>>>>>>>>>> This approach builds a *table-level global index* that can be
>>>>>>>>>>> partitioned independently of the base table’s partition scheme. 
>>>>>>>>>>> Instead of
>>>>>>>>>>> aligning with table partitions, the index can be bucketed using a 
>>>>>>>>>>> hash
>>>>>>>>>>> function on primary keys, ensuring that each PK deterministically 
>>>>>>>>>>> maps to a
>>>>>>>>>>> specific index file.
>>>>>>>>>>>
>>>>>>>>>>> This model is well-suited for bulk index initial loading,
>>>>>>>>>>> providing consistent and efficient point lookups through 
>>>>>>>>>>> deterministic
>>>>>>>>>>> key-to-file mapping. The primary challenge is maintaining index 
>>>>>>>>>>> freshness
>>>>>>>>>>> as new records arrive, requiring mechanisms to keep the index files
>>>>>>>>>>> synchronized with the underlying data. This approach is similar to 
>>>>>>>>>>> how Hudi
>>>>>>>>>>> manages its record index in that the number of partitions matches 
>>>>>>>>>>> the
>>>>>>>>>>> number of index file groups(this is configurable to the user).
>>>>>>>>>>>
>>>>>>>>>>> *Pros:*
>>>>>>>>>>>
>>>>>>>>>>>    - *Flexible partitioning*: Index partitions are decoupled
>>>>>>>>>>>    from table partitions, allowing more control over lookup 
>>>>>>>>>>> performance.
>>>>>>>>>>>    - Index files do not need to be rewritten during compaction
>>>>>>>>>>>    jobs.
>>>>>>>>>>>
>>>>>>>>>>> *Cons:*
>>>>>>>>>>>
>>>>>>>>>>>    - *Synchronous maintenance*: Index must be kept up to date
>>>>>>>>>>>    during each write, adding complexity. And with index files in 
>>>>>>>>>>> each
>>>>>>>>>>>    partition accumulating, a compaction job for index files in each 
>>>>>>>>>>> partition
>>>>>>>>>>>    might be needed.
>>>>>>>>>>>    - *Distributed access*: I don’t yet see a clean way to read
>>>>>>>>>>>    and maintain the index in a distributed fashion across engines. 
>>>>>>>>>>> This aspect
>>>>>>>>>>>    needs some further design and brainstorming.
>>>>>>>>>>>
>>>>>>>>>>> ------------------------------
>>>>>>>>>>> 2. *File-Level Indexing*
>>>>>>>>>>>
>>>>>>>>>>> In this approach, *each data file has a corresponding index
>>>>>>>>>>> file*. When an upsert arrives, we rely on *table partitioning*
>>>>>>>>>>> and *file pruning* to reduce the number of index files we need
>>>>>>>>>>> to scan. To make this efficient, the table should be partitioned 
>>>>>>>>>>> and sorted
>>>>>>>>>>> by the PK.
>>>>>>>>>>>
>>>>>>>>>>> We can also further accelerate index file pruning using *Bloom
>>>>>>>>>>> filters*:
>>>>>>>>>>>
>>>>>>>>>>>    - Either as a *centralized Bloom index*, storing filters for
>>>>>>>>>>>    all files in one place. (table level bloom filter index?)
>>>>>>>>>>>    - Or embedded within *Parquet File* (if the index itself is
>>>>>>>>>>>    Parquet).
>>>>>>>>>>>
>>>>>>>>>>> *Pros:*
>>>>>>>>>>>
>>>>>>>>>>>    - No need to track index-data consistency. If an index file
>>>>>>>>>>>    is missing, we just scan the data file directly.
>>>>>>>>>>>    - Index access is naturally *distributed* as index files are
>>>>>>>>>>>    co-located with data files during scan.
>>>>>>>>>>>
>>>>>>>>>>> *Cons:*
>>>>>>>>>>>
>>>>>>>>>>>    - *Less flexible*: The index needs to use the same
>>>>>>>>>>>    partitioning strategy as the table.
>>>>>>>>>>>    - *Compaction overhead*: Whenever data files are compacted
>>>>>>>>>>>    or rewritten, the corresponding index files must also be updated.
>>>>>>>>>>>
>>>>>>>>>>> ------------------------------
>>>>>>>>>>>
>>>>>>>>>>> I am still leaning toward Option 2 for its simplicity. And, if
>>>>>>>>>>> combined with the Bloom filters, its performance is promising. 
>>>>>>>>>>> While I
>>>>>>>>>>> agree that Option 1 is also worth considering because of the 
>>>>>>>>>>> flexibility it
>>>>>>>>>>> offers to partition skipping, I feel its uncertain performance gain 
>>>>>>>>>>> might
>>>>>>>>>>> not justify the implementation complexity added by it. Let me know 
>>>>>>>>>>> what you
>>>>>>>>>>> think about it.
>>>>>>>>>>> Additional Note:
>>>>>>>>>>>
>>>>>>>>>>> I think periodically rewriting indexes to match query patterns
>>>>>>>>>>> might not be a good idea, as it can be very costly for large 
>>>>>>>>>>> datasets.
>>>>>>>>>>> Also, the index file format will play a crucial role in key lookup
>>>>>>>>>>> performance. For example, Hudi uses HFile for indexing, which 
>>>>>>>>>>> supports fast
>>>>>>>>>>> point lookups thanks to its sorted key-value layout, built-in 
>>>>>>>>>>> caching, and
>>>>>>>>>>> Bloom filters.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Xiaoxuan
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 30, 2025 at 4:25 AM Péter Váry <
>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Xiaoxuan,
>>>>>>>>>>>> I hope you had a good time on your time off!
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your detailed response. I think it would help if we
>>>>>>>>>>>> focused on the specific use cases we want to support and what we 
>>>>>>>>>>>> ultimately
>>>>>>>>>>>> aim to achieve. By my understanding, there are a few distinct 
>>>>>>>>>>>> scenarios
>>>>>>>>>>>> we’ve been circling around:
>>>>>>>>>>>>
>>>>>>>>>>>>    1. Do we want writers to be able to easily write positional
>>>>>>>>>>>>    deletes instead of equality deletes?
>>>>>>>>>>>>    2. Do we want readers to easily convert equality deletes to
>>>>>>>>>>>>    positional deletes?
>>>>>>>>>>>>    3. Do we want readers to easily find records based on a
>>>>>>>>>>>>    primary key?
>>>>>>>>>>>>
>>>>>>>>>>>> Let me know if I’ve missed any important ones.
>>>>>>>>>>>>
>>>>>>>>>>>>  Personally, I see the first use case as the most critical, and
>>>>>>>>>>>> solving it makes the 2nd one obsolete. Please let me know about 
>>>>>>>>>>>> your
>>>>>>>>>>>> thoughts/preferences. It would help understand your point of view 
>>>>>>>>>>>> better.
>>>>>>>>>>>>
>>>>>>>>>>>> I also want to reiterate a point from my earlier comments:
>>>>>>>>>>>> > Notice that I used "index-partition". During the design we
>>>>>>>>>>>> can decide to use the table partitioning for index partitioning as 
>>>>>>>>>>>> well.
>>>>>>>>>>>> > That is why I was suggesting "index-partition"s. I don't have
>>>>>>>>>>>> a ready answer for this, but making sure that the index split fits 
>>>>>>>>>>>> into the
>>>>>>>>>>>> memory is important for Flink as well.
>>>>>>>>>>>>
>>>>>>>>>>>> So I agree with your point: for large partitions, directly
>>>>>>>>>>>> mapping them to "index-partition"s could be prohibitive.
>>>>>>>>>>>>
>>>>>>>>>>>> In discussions with Steven, we came up with a generic idea that
>>>>>>>>>>>> might help:
>>>>>>>>>>>>
>>>>>>>>>>>>    - Store index files as a new content file type in the
>>>>>>>>>>>>    manifest list, with column stats to support filtering during 
>>>>>>>>>>>> planning.
>>>>>>>>>>>>    - Periodically rewrite indexes to align with query patterns
>>>>>>>>>>>>    (e.g., bucketing by hashed PK).
>>>>>>>>>>>>    - We're only interested in unique indexes for now, so we
>>>>>>>>>>>>    can rely on snapshot sequence numbers instead of delete vectors.
>>>>>>>>>>>>    - The merging logic helps when writers produce index files
>>>>>>>>>>>>    with different granularities - we just read all relevant index 
>>>>>>>>>>>> files.
>>>>>>>>>>>>
>>>>>>>>>>>> That said, organizing both the index files and the records
>>>>>>>>>>>> within them to best support our query patterns remains key. That’s 
>>>>>>>>>>>> why
>>>>>>>>>>>> understanding the actual use cases is so important.
>>>>>>>>>>>>
>>>>>>>>>>>> For the Flink ingestion use case specifically, our best idea
>>>>>>>>>>>> for organizing the index is as follows:
>>>>>>>>>>>>
>>>>>>>>>>>>    - Generate ranges based on the primary key (PK) hash values.
>>>>>>>>>>>>    - Order records within each index file by PK hash.
>>>>>>>>>>>>
>>>>>>>>>>>> This structure allows us to locate a record with a given PK by
>>>>>>>>>>>> reading just one index file and accessing a single row group 
>>>>>>>>>>>> within the
>>>>>>>>>>>> corresponding index file. It’s a lightweight and performant 
>>>>>>>>>>>> approach that
>>>>>>>>>>>> aligns well with Flink’s streaming characteristics.
>>>>>>>>>>>>
>>>>>>>>>>>> Looking forward to your thoughts!
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Peter
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont:
>>>>>>>>>>>> 2025. máj. 29., Cs, 18:16):
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Peter, thanks for sharing the context around the Flink
>>>>>>>>>>>>> streaming use case and side note for concurrent write. Apologies 
>>>>>>>>>>>>> for the
>>>>>>>>>>>>> delay as I just got back from a vacation. Yeah, I agree, having 
>>>>>>>>>>>>> the index
>>>>>>>>>>>>> at the partition level is a better approach if we plan to use 
>>>>>>>>>>>>> caching. As a
>>>>>>>>>>>>> distributed cache would introduce additional system overhead, and
>>>>>>>>>>>>> maintaining separate caches on each node could lead to redundant 
>>>>>>>>>>>>> storage
>>>>>>>>>>>>> footprints.
>>>>>>>>>>>>>
>>>>>>>>>>>>> But having one index file for the entire partition is not
>>>>>>>>>>>>> feasible because paying the cost of rewriting the index file for 
>>>>>>>>>>>>> an entire
>>>>>>>>>>>>> partition on every write operation is expensive and could 
>>>>>>>>>>>>> introduce
>>>>>>>>>>>>> significant write latency. If we want to use indexes at planning 
>>>>>>>>>>>>> time, we
>>>>>>>>>>>>> would need a mapping from primary keys to data files and row 
>>>>>>>>>>>>> positions for
>>>>>>>>>>>>> the whole partition. This can be constructed by aggregating 
>>>>>>>>>>>>> file-level
>>>>>>>>>>>>> indexes at the planner. Even though each index file corresponds 
>>>>>>>>>>>>> to a single
>>>>>>>>>>>>> data file, we can use a thread pool to load them in parallel. 
>>>>>>>>>>>>> Compared to
>>>>>>>>>>>>> loading one large index file, the performance could be similar. 
>>>>>>>>>>>>> Since the
>>>>>>>>>>>>> index files are already cached in memory or on disk, index files 
>>>>>>>>>>>>> loading
>>>>>>>>>>>>> time becomes negligible. In fact, having one index file per data 
>>>>>>>>>>>>> file might
>>>>>>>>>>>>> be advantageous, as it allows for incremental loading.
>>>>>>>>>>>>>
>>>>>>>>>>>>> One thing about file-level indexing is that planner or task
>>>>>>>>>>>>> node is not required to scan all index files upfront. Partition 
>>>>>>>>>>>>> pruning and
>>>>>>>>>>>>> file filtering should occur prior to index access, allowing us to 
>>>>>>>>>>>>> load only
>>>>>>>>>>>>> the relevant index files associated with the pruned data set.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Another concern I have with the caching approach is
>>>>>>>>>>>>> determining which partitions to cache, since caching indexes for 
>>>>>>>>>>>>> the entire
>>>>>>>>>>>>> table isn't practical. For time-based partitions, this might be 
>>>>>>>>>>>>> more
>>>>>>>>>>>>> straightforward, for example, caching the most recent partitions. 
>>>>>>>>>>>>> However,
>>>>>>>>>>>>> that's not always applicable for all use cases.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If after prototyping, we find the solution isn’t performant
>>>>>>>>>>>>> enough for the streaming use case and we still want it to be 
>>>>>>>>>>>>> handled, we
>>>>>>>>>>>>> could explore a hybrid approach of Option 2 and Option 3 that 
>>>>>>>>>>>>> Anton
>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also, thanks to Ismail for highlighting the BigQuery approach,
>>>>>>>>>>>>> that's helpful context!
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Xiaoxuan
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, May 14, 2025 at 3:39 AM ismail simsek <
>>>>>>>>>>>>> ismailxsim...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi All, Thank you for working on this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I wanted to share a reference to the BigQuery implementation 
>>>>>>>>>>>>>> <https://cloud.google.com/bigquery/docs/change-data-capture#query-max-staleness>
>>>>>>>>>>>>>>  (Option 3) as another potential approach, and for inspiration.
>>>>>>>>>>>>>> In this setup, The engine is running periodic merge jobs and 
>>>>>>>>>>>>>> applying equality deletes to the actual table, based on PK. and 
>>>>>>>>>>>>>> for some cases applying it during runtime.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://cloud.google.com/bigquery/docs/change-data-capture#query-max-staleness
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best regards
>>>>>>>>>>>>>> ismail
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, May 14, 2025 at 7:37 AM Péter Váry <
>>>>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Xiaoxuan,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Let me describe, how the Flink streaming writer uses
>>>>>>>>>>>>>>> equality deletes, and how it could use indexes.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> When the Flink streaming writer receives a new insert, then
>>>>>>>>>>>>>>> it appends the data to a data file. When it receives a delete, 
>>>>>>>>>>>>>>> it appends
>>>>>>>>>>>>>>> the primary key to an equality delete file. When it receives an 
>>>>>>>>>>>>>>> update the
>>>>>>>>>>>>>>> it creates both a data and a delete record.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If we have an index available, the writer could rely on the
>>>>>>>>>>>>>>> index to get the position of the deleted record, and write a 
>>>>>>>>>>>>>>> position
>>>>>>>>>>>>>>> delete instead of an equality delete.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This index lookup is only effective, if we can keep the
>>>>>>>>>>>>>>> index in memory, and effectively distribute the incoming 
>>>>>>>>>>>>>>> records between
>>>>>>>>>>>>>>> the writers so the index cache is used. In this case, the 
>>>>>>>>>>>>>>> lookup cost
>>>>>>>>>>>>>>> becomes O(1) if we consider only the file access cost. If we 
>>>>>>>>>>>>>>> need to read
>>>>>>>>>>>>>>> an index file for every data file we are adding O(n) delay on 
>>>>>>>>>>>>>>> record level,
>>>>>>>>>>>>>>> where the n is the number of the data files in the table.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I agree with you that the cost of rewriting the index file
>>>>>>>>>>>>>>> is not trivial, but that happens on commit level. This is much 
>>>>>>>>>>>>>>> better than
>>>>>>>>>>>>>>> having an overhead on record level.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > As a result, we’d likely need to split and read the index
>>>>>>>>>>>>>>> file in distributed planning mode
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> That is why I was suggesting "index-partition"s. I don't
>>>>>>>>>>>>>>> have a ready answer for this, but making sure that the index 
>>>>>>>>>>>>>>> split fits
>>>>>>>>>>>>>>> into the memory is important for Flink as well.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > However the drawback of partition-level index is that the
>>>>>>>>>>>>>>> index must always be kept up to date to remain useful.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I don't see how we can avoid that in case of Flink writers.
>>>>>>>>>>>>>>> The alternative is to read the non-indexed files in every 
>>>>>>>>>>>>>>> writer, which
>>>>>>>>>>>>>>> seems like a no-go for me.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > the value of an inverted index is not column pruning. It’s
>>>>>>>>>>>>>>> how it enables fast point lookups
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In my experience, the biggest gains are realized during
>>>>>>>>>>>>>>> planning, when the planner prunes whole files. While there is a 
>>>>>>>>>>>>>>> possibility
>>>>>>>>>>>>>>> for a distributed planner to read fewer number of index files, 
>>>>>>>>>>>>>>> I don't
>>>>>>>>>>>>>>> think it is possible for a planner to read the index files if 
>>>>>>>>>>>>>>> they are
>>>>>>>>>>>>>>> stored for every data file. (Unless we are talking about a 
>>>>>>>>>>>>>>> catalog which
>>>>>>>>>>>>>>> merges/caches the relevant part of the indexes. Sadly this is 
>>>>>>>>>>>>>>> something
>>>>>>>>>>>>>>> which is available for the Flink writers)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Side note: The index solution still prevents (or makes
>>>>>>>>>>>>>>> complicated) to handle concurrent writes to the table. 
>>>>>>>>>>>>>>> Currently, if a
>>>>>>>>>>>>>>> concurrent writer updated the record, Flink just updated it 
>>>>>>>>>>>>>>> again, and only
>>>>>>>>>>>>>>> a single record remains with the doubly updated primary key. 
>>>>>>>>>>>>>>> With the index
>>>>>>>>>>>>>>> based solution, we might end up with duplicated keys. This 
>>>>>>>>>>>>>>> might be an
>>>>>>>>>>>>>>> acceptable tradeoff, but we should be aware of it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for working on this Xiaoxuan!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, May 14, 2025, 05:25 Xiaoxuan Li <
>>>>>>>>>>>>>>> xiaoxuan.li....@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Peter,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the detailed illustration. I understand your
>>>>>>>>>>>>>>>> concern.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I believe the core question here is whether the index is
>>>>>>>>>>>>>>>> used during job planning or at the scan task. This depends on 
>>>>>>>>>>>>>>>> how index
>>>>>>>>>>>>>>>> files are referenced, at the file level or partition level.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In my view, both approaches ultimately serve the same
>>>>>>>>>>>>>>>> purpose. The main difference lies in how the index files are 
>>>>>>>>>>>>>>>> generated and
>>>>>>>>>>>>>>>> split.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For a partition-level index, would we generate a single
>>>>>>>>>>>>>>>> index file per partition? If so, each update to the partition 
>>>>>>>>>>>>>>>> would require
>>>>>>>>>>>>>>>> either rewriting a whole new index file, which could be costly 
>>>>>>>>>>>>>>>> at write
>>>>>>>>>>>>>>>> time given that the index size grows along with data size, or 
>>>>>>>>>>>>>>>> appending a
>>>>>>>>>>>>>>>> new index file per write operation, which would functionally 
>>>>>>>>>>>>>>>> be similar to
>>>>>>>>>>>>>>>> a file-level index.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> And due to the potentially large size of index files, even
>>>>>>>>>>>>>>>> one index file per partition may not support efficient 
>>>>>>>>>>>>>>>> planning in local
>>>>>>>>>>>>>>>> planning mode. As a result, we’d likely need to split and read 
>>>>>>>>>>>>>>>> the index
>>>>>>>>>>>>>>>> file in distributed planning mode, making it functionally 
>>>>>>>>>>>>>>>> equivalent to
>>>>>>>>>>>>>>>> reading multiple index files at the task level.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> However the drawback of partition-level index is that the
>>>>>>>>>>>>>>>> index must always be kept up to date to remain useful.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Also, Steven, in my opinion, the value of an inverted index
>>>>>>>>>>>>>>>> is not column pruning. It’s how it enables fast point lookups. 
>>>>>>>>>>>>>>>> As mentioned
>>>>>>>>>>>>>>>> earlier, we just need to ensure that index lookups are faster 
>>>>>>>>>>>>>>>> than file
>>>>>>>>>>>>>>>> scan with delete predicate evaluation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Let me know whether this addresses any of the concerns.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Xiaoxuan
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, May 12, 2025 at 4:34 PM Steven Wu <
>>>>>>>>>>>>>>>> stevenz...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> agree with Peter that 1:1 mapping of data files and
>>>>>>>>>>>>>>>>> inverted indexes are not as useful. With columnar format like 
>>>>>>>>>>>>>>>>> Parquet, this
>>>>>>>>>>>>>>>>> can also be achieved equivalently by reading the data file 
>>>>>>>>>>>>>>>>> with projection
>>>>>>>>>>>>>>>>> on the identifier columns.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, May 12, 2025 at 4:20 AM Péter Váry <
>>>>>>>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Xiaoxuan,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Do we plan to store the indexes in a separate file
>>>>>>>>>>>>>>>>>> alongside the data files? If so, then I have the following 
>>>>>>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>>>>>>> - I agree that the 1-on-1 mapping of data files and index
>>>>>>>>>>>>>>>>>> files is easy to maintain OTOH it is less useful as an index.
>>>>>>>>>>>>>>>>>> - The writer (which is looking for a column with a
>>>>>>>>>>>>>>>>>> specific primary key) needs to open all of the index files 
>>>>>>>>>>>>>>>>>> until it finds
>>>>>>>>>>>>>>>>>> the given key. Since the index files are typically small, 
>>>>>>>>>>>>>>>>>> the cost here is
>>>>>>>>>>>>>>>>>> O(n) where n is the number of the index files (equal to the 
>>>>>>>>>>>>>>>>>> number of the
>>>>>>>>>>>>>>>>>> data files).
>>>>>>>>>>>>>>>>>> - If we add a Bloom filter on the primary key to the data
>>>>>>>>>>>>>>>>>> files, then the writer reads the footer of every data file 
>>>>>>>>>>>>>>>>>> and reads only
>>>>>>>>>>>>>>>>>> the file which contains the primary key. This is also O(n) 
>>>>>>>>>>>>>>>>>> where the n is
>>>>>>>>>>>>>>>>>> the number of the data files.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So, IMHO having a 1-on-1 mapping between the data files
>>>>>>>>>>>>>>>>>> and the index files is not too beneficial.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> OTOH if we maintain the index files on "index-partition"
>>>>>>>>>>>>>>>>>> level, then the cost becomes O(p) where the p is the number 
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> "index-partitions" which could be significantly lower. 
>>>>>>>>>>>>>>>>>> Notice that I used
>>>>>>>>>>>>>>>>>> "index-partition". During the design we can decide to use 
>>>>>>>>>>>>>>>>>> the table
>>>>>>>>>>>>>>>>>> partitioning for index partitioning as well. This requires 
>>>>>>>>>>>>>>>>>> us to use easily
>>>>>>>>>>>>>>>>>> maintainable indexes.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So while maintaining the index is more costly if it
>>>>>>>>>>>>>>>>>> contains data from multiple data files, it also becomes much 
>>>>>>>>>>>>>>>>>> more useful.
>>>>>>>>>>>>>>>>>> Maintenance procedures could build upon the data file 
>>>>>>>>>>>>>>>>>> immutability and
>>>>>>>>>>>>>>>>>> simplify the index maintenance.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks Xiaoxuan for the good conversation!
>>>>>>>>>>>>>>>>>> Peter
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta
>>>>>>>>>>>>>>>>>> (időpont: 2025. máj. 10., Szo, 9:13):
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks Anton for the context and summary of the options,
>>>>>>>>>>>>>>>>>>> great to hear that this direction aligns with earlier 
>>>>>>>>>>>>>>>>>>> community
>>>>>>>>>>>>>>>>>>> discussions. And thanks Gyula and Peter for the clear 
>>>>>>>>>>>>>>>>>>> analysis. I agree
>>>>>>>>>>>>>>>>>>> with both of you, the index needs to be designed and 
>>>>>>>>>>>>>>>>>>> implemented
>>>>>>>>>>>>>>>>>>> efficiently in order to scale for large data sets and 
>>>>>>>>>>>>>>>>>>> streaming use cases.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I wanted to share some context around why I proposed a
>>>>>>>>>>>>>>>>>>> file-level index.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The benefit of a file-level index is that it naturally
>>>>>>>>>>>>>>>>>>> supports partition pruning, file skipping and time travel. 
>>>>>>>>>>>>>>>>>>> Issues like
>>>>>>>>>>>>>>>>>>> index invalidation are inherently handled because the index 
>>>>>>>>>>>>>>>>>>> is tied to the
>>>>>>>>>>>>>>>>>>> data file’s lifecycle. Even if the index isn't always 
>>>>>>>>>>>>>>>>>>> perfectly in sync,
>>>>>>>>>>>>>>>>>>> it's still usable, and since its lifecycle is bound to the 
>>>>>>>>>>>>>>>>>>> file, meaning
>>>>>>>>>>>>>>>>>>> lifecycle management only needs to track with the file 
>>>>>>>>>>>>>>>>>>> itself, reducing
>>>>>>>>>>>>>>>>>>> complexity in index management and maintenance.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On the other hand, partition/table level indexes must
>>>>>>>>>>>>>>>>>>> explicitly manage synchronization with the table, handling
>>>>>>>>>>>>>>>>>>> updates/snapshots on the writer side, and partition 
>>>>>>>>>>>>>>>>>>> pruning, file skipping,
>>>>>>>>>>>>>>>>>>> time travel, and index invalidation on the reader side. 
>>>>>>>>>>>>>>>>>>> Since index files
>>>>>>>>>>>>>>>>>>> are immutable and grow alongside data files, maintaining 
>>>>>>>>>>>>>>>>>>> them can become as
>>>>>>>>>>>>>>>>>>> complex as managing the entire table. It’s not surprising 
>>>>>>>>>>>>>>>>>>> that Hudi uses a
>>>>>>>>>>>>>>>>>>> metadata table(which is essentially a Hudi MOR table) to 
>>>>>>>>>>>>>>>>>>> manage its index.
>>>>>>>>>>>>>>>>>>> Personally, I find this approach less appealing, as it 
>>>>>>>>>>>>>>>>>>> introduces a
>>>>>>>>>>>>>>>>>>> circular dependency that adds architectural complexity.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For the positional delete case, even if index lookups
>>>>>>>>>>>>>>>>>>> could be faster than scanning files with predicates, it's 
>>>>>>>>>>>>>>>>>>> still unclear
>>>>>>>>>>>>>>>>>>> whether that alone is sufficient for streaming workloads, 
>>>>>>>>>>>>>>>>>>> especially when
>>>>>>>>>>>>>>>>>>> many files are involved. I think Peter’s idea of 
>>>>>>>>>>>>>>>>>>> maintaining a hot cache of
>>>>>>>>>>>>>>>>>>> the index within the streaming engine is promising. 
>>>>>>>>>>>>>>>>>>> Alternatively, using an
>>>>>>>>>>>>>>>>>>> external key-value store for fast lookups could also be 
>>>>>>>>>>>>>>>>>>> explored. Would be
>>>>>>>>>>>>>>>>>>> great to hear others’ thoughts on this.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Xiaoxuan
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, May 9, 2025 at 8:12 AM Péter Váry <
>>>>>>>>>>>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> When going through the options mentioned by Anton, I
>>>>>>>>>>>>>>>>>>>> feel that Option 1 and 4 are just pushing the 
>>>>>>>>>>>>>>>>>>>> responsibility of converting
>>>>>>>>>>>>>>>>>>>> the equality deletes to positional deletes to the engine 
>>>>>>>>>>>>>>>>>>>> side. The only
>>>>>>>>>>>>>>>>>>>> difference is whether the conversion happens on the write 
>>>>>>>>>>>>>>>>>>>> side or on the
>>>>>>>>>>>>>>>>>>>> read side. This is a step back, and doesn't help solving 
>>>>>>>>>>>>>>>>>>>> the problem for
>>>>>>>>>>>>>>>>>>>> streaming engines.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Option 2 needs a bit of thought. We need to find a good
>>>>>>>>>>>>>>>>>>>> indexing strategy which allows big tables and plays nice 
>>>>>>>>>>>>>>>>>>>> with streaming
>>>>>>>>>>>>>>>>>>>> writes too. It would be good to allow the engines to cache 
>>>>>>>>>>>>>>>>>>>> a limited part
>>>>>>>>>>>>>>>>>>>> of the index, and distribute records to the writers in a 
>>>>>>>>>>>>>>>>>>>> way that the cache
>>>>>>>>>>>>>>>>>>>> locality is considered. Also creating an index file for 
>>>>>>>>>>>>>>>>>>>> every data file
>>>>>>>>>>>>>>>>>>>> might be suboptimal, as reading that many index files 
>>>>>>>>>>>>>>>>>>>> could tank
>>>>>>>>>>>>>>>>>>>> performance.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I have yet to see a good idea for Option 3.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Another option could be to provide a way to convert the
>>>>>>>>>>>>>>>>>>>> equality deletes to positional deletes as soon as 
>>>>>>>>>>>>>>>>>>>> possible. Maybe in
>>>>>>>>>>>>>>>>>>>> frequent compaction tasks, or at first read? When we first 
>>>>>>>>>>>>>>>>>>>> apply an
>>>>>>>>>>>>>>>>>>>> equality delete for a file, it is very easy to calculate 
>>>>>>>>>>>>>>>>>>>> the equivalent DV
>>>>>>>>>>>>>>>>>>>> for the equality delete. Subsequent readers could depend 
>>>>>>>>>>>>>>>>>>>> on the DV instead
>>>>>>>>>>>>>>>>>>>> of the equality delete. This could be the least disruptive 
>>>>>>>>>>>>>>>>>>>> change, but I
>>>>>>>>>>>>>>>>>>>> see a few issues with this solution as well:
>>>>>>>>>>>>>>>>>>>> - First read still depends on the equality delete which
>>>>>>>>>>>>>>>>>>>> in edge cases could be very costly
>>>>>>>>>>>>>>>>>>>> - Filters don't play well with this method
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> All-in-all, I like the index solution (Option 2) for
>>>>>>>>>>>>>>>>>>>> several reasons:
>>>>>>>>>>>>>>>>>>>> - Cuts out the need for equality deletes which would
>>>>>>>>>>>>>>>>>>>> reduce the code quite a bit
>>>>>>>>>>>>>>>>>>>> - While slows down the write path a bit, the overall
>>>>>>>>>>>>>>>>>>>> workflow (write+read) benefits from it
>>>>>>>>>>>>>>>>>>>> - The index could be reused for other use-cases as well
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Peter
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Gyula Fóra <gyula.f...@gmail.com> ezt írta (időpont:
>>>>>>>>>>>>>>>>>>>> 2025. máj. 9., P, 12:16):
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Anton,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thank you for summarizing the options we see at this
>>>>>>>>>>>>>>>>>>>>> stage in a structured and concise way.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Based on the use-cases I see in the industry, I feel
>>>>>>>>>>>>>>>>>>>>> that not all of the highlighted options are feasible (or 
>>>>>>>>>>>>>>>>>>>>> desirable).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Option 4 would basically remove any possibilities for
>>>>>>>>>>>>>>>>>>>>> native streaming CDC reads on tables, severely limiting 
>>>>>>>>>>>>>>>>>>>>> how Iceberg can be
>>>>>>>>>>>>>>>>>>>>> used in the future for real-time use-cases from Flink and 
>>>>>>>>>>>>>>>>>>>>> other similar
>>>>>>>>>>>>>>>>>>>>> engines that may want to connect. I understand that the 
>>>>>>>>>>>>>>>>>>>>> view reconciliation
>>>>>>>>>>>>>>>>>>>>> approach is implemented in Spark SQL already but 
>>>>>>>>>>>>>>>>>>>>> implementing it in a
>>>>>>>>>>>>>>>>>>>>> proper streaming way would likely lead to similar 
>>>>>>>>>>>>>>>>>>>>> problems that we are
>>>>>>>>>>>>>>>>>>>>> trying to solve here in the first place.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Regarding Option 1, introducing engine specific/custom
>>>>>>>>>>>>>>>>>>>>> indexing solutions would go against the core design 
>>>>>>>>>>>>>>>>>>>>> principles as it would
>>>>>>>>>>>>>>>>>>>>> be hard to mix different engines when writing / reading 
>>>>>>>>>>>>>>>>>>>>> tables for CDC
>>>>>>>>>>>>>>>>>>>>> use-cases. (A streaming job would have a hard time 
>>>>>>>>>>>>>>>>>>>>> writing upserts/equality
>>>>>>>>>>>>>>>>>>>>> deletes to tables that were written by a different 
>>>>>>>>>>>>>>>>>>>>> engine). To
>>>>>>>>>>>>>>>>>>>>> me this sounds very similar to Option 4 in a way that it 
>>>>>>>>>>>>>>>>>>>>> pushes too much
>>>>>>>>>>>>>>>>>>>>> logic to the engines in a way that would hurt the 
>>>>>>>>>>>>>>>>>>>>> compatibility across the
>>>>>>>>>>>>>>>>>>>>> engines.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Based on this and the context of the discussion,
>>>>>>>>>>>>>>>>>>>>> Option 2 or a combination of Option 2 & 3 sounds the most 
>>>>>>>>>>>>>>>>>>>>> reasonable to me.
>>>>>>>>>>>>>>>>>>>>> There are still a lot of questions on the practical 
>>>>>>>>>>>>>>>>>>>>> implementation of the
>>>>>>>>>>>>>>>>>>>>> indexes and how we can do this efficiently so this is 
>>>>>>>>>>>>>>>>>>>>> only a very early
>>>>>>>>>>>>>>>>>>>>> feedback from my end.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, May 9, 2025 at 12:14 AM Anton Okolnychyi <
>>>>>>>>>>>>>>>>>>>>> aokolnyc...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I am glad to see that folks are thinking about this
>>>>>>>>>>>>>>>>>>>>>> problem. I am looking forward to a formal 
>>>>>>>>>>>>>>>>>>>>>> proposal/design doc to discuss
>>>>>>>>>>>>>>>>>>>>>> details!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Overall, this aligns with what we discussed in the
>>>>>>>>>>>>>>>>>>>>>> community earlier w.r.t. the future of equality deletes 
>>>>>>>>>>>>>>>>>>>>>> and streaming
>>>>>>>>>>>>>>>>>>>>>> upserts. If I were to summarize, we have the following 
>>>>>>>>>>>>>>>>>>>>>> options:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Option 1: Add an inverted index (potentially
>>>>>>>>>>>>>>>>>>>>>> distributed) maintained by an engine that does streaming 
>>>>>>>>>>>>>>>>>>>>>> writes to always
>>>>>>>>>>>>>>>>>>>>>> produce DVs, even in streaming use cases. 
>>>>>>>>>>>>>>>>>>>>>> Deprecate/remove equality deletes
>>>>>>>>>>>>>>>>>>>>>> from Iceberg.
>>>>>>>>>>>>>>>>>>>>>> Option 2: Add native indexing to Iceberg so that the
>>>>>>>>>>>>>>>>>>>>>> lookup of positions is quick and efficient enough to be 
>>>>>>>>>>>>>>>>>>>>>> used in streaming
>>>>>>>>>>>>>>>>>>>>>> upserts. Deprecate/remove equality deletes from Iceberg.
>>>>>>>>>>>>>>>>>>>>>> Option 3: Rethink equality deletes, potentially by
>>>>>>>>>>>>>>>>>>>>>> introducing more restrictions and trying to scope them 
>>>>>>>>>>>>>>>>>>>>>> to particular data
>>>>>>>>>>>>>>>>>>>>>> files, similar to DVs.
>>>>>>>>>>>>>>>>>>>>>> Option 4: Standardize on a view reconciliation
>>>>>>>>>>>>>>>>>>>>>> approach that Tabular implemented for CDC.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I would like to highlight that what Spark does today
>>>>>>>>>>>>>>>>>>>>>> during MERGE is similar to a lookup in an inverted index 
>>>>>>>>>>>>>>>>>>>>>> represented by
>>>>>>>>>>>>>>>>>>>>>> another Iceberg table. That is OK for batch jobs but not 
>>>>>>>>>>>>>>>>>>>>>> enough for
>>>>>>>>>>>>>>>>>>>>>> streaming.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> - Anton
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> чт, 8 трав. 2025 р. о 10:08 Xiaoxuan Li <
>>>>>>>>>>>>>>>>>>>>>> xiaoxuan.li....@gmail.com> пише:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi Zheng, Steven, Amogh and Gyula. Thank you all for
>>>>>>>>>>>>>>>>>>>>>>> the feedback!
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I agree with everyone, we need to narrow down the
>>>>>>>>>>>>>>>>>>>>>>> scope of this optimization. The primary issue I'm 
>>>>>>>>>>>>>>>>>>>>>>> trying to address is the
>>>>>>>>>>>>>>>>>>>>>>> slow read performance caused by the growing number of 
>>>>>>>>>>>>>>>>>>>>>>> equality delete
>>>>>>>>>>>>>>>>>>>>>>> files(streaming CDC scenarios). The other potential use 
>>>>>>>>>>>>>>>>>>>>>>> cases are only
>>>>>>>>>>>>>>>>>>>>>>> mentioned to show the extensibility of this approach.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> And both equality and positional deletes suffer from
>>>>>>>>>>>>>>>>>>>>>>> the same core problem, records are evaluated repeatedly 
>>>>>>>>>>>>>>>>>>>>>>> against multiple
>>>>>>>>>>>>>>>>>>>>>>> delete predicates, at read time for equality deletes, 
>>>>>>>>>>>>>>>>>>>>>>> and at write time for
>>>>>>>>>>>>>>>>>>>>>>> positional deletes. This repeated evaluation is where 
>>>>>>>>>>>>>>>>>>>>>>> the real bottleneck
>>>>>>>>>>>>>>>>>>>>>>> lies.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> It’s particularly bad for equality deletes, since we
>>>>>>>>>>>>>>>>>>>>>>> constantly recompute row positions during reads without 
>>>>>>>>>>>>>>>>>>>>>>> ever materializing
>>>>>>>>>>>>>>>>>>>>>>> them. Eventually, a maintenance job is required just to 
>>>>>>>>>>>>>>>>>>>>>>> rewrite them into
>>>>>>>>>>>>>>>>>>>>>>> positional deletes.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> The idea is to eliminate this overhead by
>>>>>>>>>>>>>>>>>>>>>>> introducing an inverted index that maps values (or 
>>>>>>>>>>>>>>>>>>>>>>> value combinations)
>>>>>>>>>>>>>>>>>>>>>>> directly to row positions. This lets us skip full 
>>>>>>>>>>>>>>>>>>>>>>> predicate evaluation and
>>>>>>>>>>>>>>>>>>>>>>> jump straight to the affected rows, similar to how Hudi 
>>>>>>>>>>>>>>>>>>>>>>> uses a record-level
>>>>>>>>>>>>>>>>>>>>>>> index for fast upserts. If we can fetch row positions 
>>>>>>>>>>>>>>>>>>>>>>> from the index, we
>>>>>>>>>>>>>>>>>>>>>>> can greatly reduce overhead during reads (for equality 
>>>>>>>>>>>>>>>>>>>>>>> deletes) and writes
>>>>>>>>>>>>>>>>>>>>>>> (for positional deletes).
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> In fact, if we can make positional deletes perform
>>>>>>>>>>>>>>>>>>>>>>> as well as equality deletes using this index, we might 
>>>>>>>>>>>>>>>>>>>>>>> be able to get rid
>>>>>>>>>>>>>>>>>>>>>>> of equality deletes, but that needs to be evaluated for 
>>>>>>>>>>>>>>>>>>>>>>> the upsert case.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> That’s part of the reason why other types of indexes
>>>>>>>>>>>>>>>>>>>>>>> came up, they’re applicable beyond just primary key 
>>>>>>>>>>>>>>>>>>>>>>> columns, but CDC is the
>>>>>>>>>>>>>>>>>>>>>>> only scenario with stricter SLA requirements. So it 
>>>>>>>>>>>>>>>>>>>>>>> makes sense to align on
>>>>>>>>>>>>>>>>>>>>>>> the exact problem and use cases. For now, I think we 
>>>>>>>>>>>>>>>>>>>>>>> can define our main
>>>>>>>>>>>>>>>>>>>>>>> goal as supporting inverted indexing over primary key 
>>>>>>>>>>>>>>>>>>>>>>> columns to address
>>>>>>>>>>>>>>>>>>>>>>> the slowness of reading caused by the growing number of 
>>>>>>>>>>>>>>>>>>>>>>> equality delete
>>>>>>>>>>>>>>>>>>>>>>> files.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks, Amogh, for bringing up the comparison with
>>>>>>>>>>>>>>>>>>>>>>> known alternatives. We should include benchmarks for 
>>>>>>>>>>>>>>>>>>>>>>> those as well, to
>>>>>>>>>>>>>>>>>>>>>>> illustrate the trade-offs in read/write performance, 
>>>>>>>>>>>>>>>>>>>>>>> storage usage, and
>>>>>>>>>>>>>>>>>>>>>>> overall cost.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Really appreciate your feedback! I’ll incorporate
>>>>>>>>>>>>>>>>>>>>>>> these into the next revision of the proposal.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Xiaoxuan
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 8, 2025 at 1:25 AM Gyula Fóra <
>>>>>>>>>>>>>>>>>>>>>>> gyula.f...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the proposal!
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I agree with what had been said above that we need
>>>>>>>>>>>>>>>>>>>>>>>> to narrow down the scope here and what is the primary 
>>>>>>>>>>>>>>>>>>>>>>>> target for the
>>>>>>>>>>>>>>>>>>>>>>>> optimization.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> As Amogh has also pointed out, CDC (streaming) read
>>>>>>>>>>>>>>>>>>>>>>>> performance (with equality deletes) would be one of 
>>>>>>>>>>>>>>>>>>>>>>>> the biggest
>>>>>>>>>>>>>>>>>>>>>>>> beneficiaries of this at a first glance.
>>>>>>>>>>>>>>>>>>>>>>>> This is especially important for Flink users where
>>>>>>>>>>>>>>>>>>>>>>>> this feature is currently completely missing and there 
>>>>>>>>>>>>>>>>>>>>>>>> is a big demand for
>>>>>>>>>>>>>>>>>>>>>>>> it as we rely on equality deletes on the write path. 
>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I am not aware of alternative proposals that would
>>>>>>>>>>>>>>>>>>>>>>>> solve the equality delete cdc read performance 
>>>>>>>>>>>>>>>>>>>>>>>> question, overall I think
>>>>>>>>>>>>>>>>>>>>>>>> using indices is reasonable and a very promising 
>>>>>>>>>>>>>>>>>>>>>>>> approach.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to more details and discussion!
>>>>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>> https://lists.apache.org/thread/njmxjmjjm341fp4mgynn483v15mhk7qd
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Thu, May 8, 2025 at 9:24 AM Amogh Jahagirdar <
>>>>>>>>>>>>>>>>>>>>>>>> 2am...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thank you for the proposal Xiaoxuan! I think I
>>>>>>>>>>>>>>>>>>>>>>>>> agree with Zheng and Steven's point that it'll 
>>>>>>>>>>>>>>>>>>>>>>>>> probably be more helpful to
>>>>>>>>>>>>>>>>>>>>>>>>> start out with more specific "what" and "why" (known 
>>>>>>>>>>>>>>>>>>>>>>>>> areas of improvement
>>>>>>>>>>>>>>>>>>>>>>>>> for Iceberg and driven by any use cases) before we 
>>>>>>>>>>>>>>>>>>>>>>>>> get too deep into the
>>>>>>>>>>>>>>>>>>>>>>>>> "how".
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> In my mind, the specific known area of improvement
>>>>>>>>>>>>>>>>>>>>>>>>> for Iceberg related to this proposal is improving 
>>>>>>>>>>>>>>>>>>>>>>>>> streaming upsert
>>>>>>>>>>>>>>>>>>>>>>>>> behavior. One area this improvement is beneficial for 
>>>>>>>>>>>>>>>>>>>>>>>>> is being able to
>>>>>>>>>>>>>>>>>>>>>>>>> provide better data freshness for Iceberg CDC mirror 
>>>>>>>>>>>>>>>>>>>>>>>>> tables without the
>>>>>>>>>>>>>>>>>>>>>>>>> heavy read + maintenance cost that currently exist 
>>>>>>>>>>>>>>>>>>>>>>>>> with Flink upserts.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> As you mentioned, equality deletes have the
>>>>>>>>>>>>>>>>>>>>>>>>> benefit of being very cheap to write but can come at 
>>>>>>>>>>>>>>>>>>>>>>>>> a high and
>>>>>>>>>>>>>>>>>>>>>>>>> unpredictable cost at read time. Challenges with 
>>>>>>>>>>>>>>>>>>>>>>>>> equality deletes have been
>>>>>>>>>>>>>>>>>>>>>>>>> discussed in the past [1].
>>>>>>>>>>>>>>>>>>>>>>>>> I'll also add that if one of the goals is to
>>>>>>>>>>>>>>>>>>>>>>>>> improving streaming upserts (e.g. for applying CDC 
>>>>>>>>>>>>>>>>>>>>>>>>> change streams into
>>>>>>>>>>>>>>>>>>>>>>>>> Iceberg mirror tables), then there are alternatives 
>>>>>>>>>>>>>>>>>>>>>>>>> that I think we should
>>>>>>>>>>>>>>>>>>>>>>>>> compare against to make
>>>>>>>>>>>>>>>>>>>>>>>>> the tradeoffs clear. These alternatives include
>>>>>>>>>>>>>>>>>>>>>>>>> leveraging the known changelog view or merge patterns 
>>>>>>>>>>>>>>>>>>>>>>>>> [2] or improving the
>>>>>>>>>>>>>>>>>>>>>>>>> existing maintenance procedures.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I think the potential for being able to use a
>>>>>>>>>>>>>>>>>>>>>>>>> inverted index for upsert cases to more directly 
>>>>>>>>>>>>>>>>>>>>>>>>> identify positions in a
>>>>>>>>>>>>>>>>>>>>>>>>> file to directly write DVs  is very exciting, but 
>>>>>>>>>>>>>>>>>>>>>>>>> before getting too far
>>>>>>>>>>>>>>>>>>>>>>>>> into the weeds, I think it'd first be helpful
>>>>>>>>>>>>>>>>>>>>>>>>> to make sure we agree on the specific problem
>>>>>>>>>>>>>>>>>>>>>>>>> we're trying to solve when we talk about performance 
>>>>>>>>>>>>>>>>>>>>>>>>> improvements along
>>>>>>>>>>>>>>>>>>>>>>>>> with any use cases, followed by comparison with known 
>>>>>>>>>>>>>>>>>>>>>>>>> alternatives (ideally
>>>>>>>>>>>>>>>>>>>>>>>>> we can get numbers that demonstrate the 
>>>>>>>>>>>>>>>>>>>>>>>>> read/write/storage/cost tradeoffs
>>>>>>>>>>>>>>>>>>>>>>>>> for the proposed inverted index).
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>> https://lists.apache.org/thread/z0gvco6hn2bpgngvk4h6xqrnw8b32sw6
>>>>>>>>>>>>>>>>>>>>>>>>> [2]https://www.tabular.io/blog/hello-world-of-cdc/
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>> Amogh Jahagirdar
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>

Reply via email to