>Isn’t the reality here is that repairing a single partition in the base table is potentially a full cluster-wide scan of the MV if you also want to detect rows in the MV that don’t exist in the base table (eg resurrection or a missed delete) Exactly. Since materialized views (MVs) are partitioned differently from their base tables, there doesn’t appear to be a more efficient way to repair them in a targeted manner—meaning we can’t restrict the repair to only a small portion of the data.
Jaydeep On Sun, May 18, 2025 at 5:57 PM Jeff Jirsa <jji...@gmail.com> wrote: > Isn’t the reality here is that repairing a single partition in the base > table is potentially a full cluster-wide scan of the MV if you also want to > detect rows in the MV that don’t exist in the base table (eg resurrection > or a missed delete) > > There’s no getting around that. Keeping an extra index doesn’t avoid that > scan, it just moves the problem around to another tier. > > > > On May 18, 2025, at 4:59 PM, Blake Eggleston <bl...@ultrablake.com> wrote: > > > Whether it’s index based repair or another mechanism, I think the proposed > repair design needs to be refined. The requirement of a global snapshot and > merkle tree build before we can start detecting and fixing problems is a > pretty big limitation. > > > Data scans during repair would become random disk accesses instead of > sequential ones, which can degrade performance. > > You’d only be reading and comparing the index files, not the sstable > contents. Reads would still be sequential. > > > Most importantly, I decided against this approach due to the complexity > of ensuring index consistency. Introducing secondary indexes opens up new > challenges, such as keeping them in sync with the actual data. > > I think this is mostly a solved problem in C*? If you had a custom SAI > index or something, this isn’t something you’d need to worry about AFAIK. > > On Sat, May 17, 2025, at 4:57 PM, Runtian Liu wrote: > > > I think you could exploit this to improve your MV repair design. Instead > of taking global snapshots and persisting merkle trees, you could implement > a set of secondary indexes on the base and view tables that you could > quickly compare the contents of for repair. > > We actually considered this approach while designing the MV repair. > However, there are several downsides: > > 1. > > It requires additional storage for the index files. > 2. > > Data scans during repair would become random disk accesses instead of > sequential ones, which can degrade performance. > 3. > > Most importantly, I decided against this approach due to the > complexity of ensuring index consistency. Introducing secondary indexes > opens up new challenges, such as keeping them in sync with the actual data. > > The goal of the design is to provide a catch-all mismatch detection > mechanism that targets the dataset users query during the online path. I > did consider adding indexes at the SSTable level to guarantee consistency > between indexes and data. > > sorted by base table partition order, but segmented by view partition > ranges > If the indexes at the SSTable level, it means it will be less flexible, we > need to rewrite the SSTables if we decide to range the view partition > ranges. > I didn’t explore this direction further due to the issues listed above. > > > The transformative repair could be done against the local index, and the > local index can repair against the global index. It opens up a lot of > possibilities, query wise, as well. > This is something I’m not entirely sure about—how exactly do we use the > local index to support the global index (i.e., the MV)? If the MV relies on > local indexes during the query path, we can definitely dig deeper into how > repair could work with that design. > > The proposed design in this CEP aims to treat the base table and its MV > like any other regular tables, so that operations such as compaction and > repair can be handled in the same way in most cases. > > On Sat, May 17, 2025 at 2:42 PM Jon Haddad <j...@rustyrazorblade.com> > wrote: > > Yeah, this is exactly what i suggested in a different part of the thread. > The transformative repair could be done against the local index, and the > local index can repair against the global index. It opens up a lot of > possibilities, query wise, as well. > > > On Sat, May 17, 2025 at 1:47 PM Blake Eggleston <bl...@ultrablake.com> > wrote: > > > > They are not two unordered sets, but rather two sets ordered by > different keys. > > I think you could exploit this to improve your MV repair design. Instead > of taking global snapshots and persisting merkle trees, you could implement > a set of secondary indexes on the base and view tables that you could > quickly compare the contents of for repair. > > The indexes would have their contents sorted by base table partition > order, but segmented by view partition ranges. Then any view <-> base > repair would compare the intersecting index slices. That would allow you to > repair data more quickly and with less operational complexity. > > On Fri, May 16, 2025, at 12:32 PM, Runtian Liu wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For example, in the chart above, each cell represents a Merkle tree that > covers data belonging to a specific base table range and a specific MV > range. When we scan a base table range, we can generate the Merkle trees > marked in red. When we scan an MV range, we can generate the Merkle trees > marked in green. The cells that can be compared are marked in blue. > > To save time and CPU resources, we persist the Merkle trees created during > a scan so we don’t need to regenerate them later. This way, when other > nodes scan and build Merkle trees based on the same “frozen” snapshot, we > can reuse the existing Merkle trees for comparison. > > On Fri, May 16, 2025 at 12:22 PM Runtian Liu <curly...@gmail.com> wrote: > > Unfortunately, no. When building Merkle trees for small token ranges in > the base table, those ranges may span the entire MV token range. As a > result, we need to scan the entire MV to generate all the necessary Merkle > trees. For efficiency, we perform this as a single pass over the entire > table rather than scanning a small range of the base or MV table > individually. As you mentioned, with storage becoming increasingly > affordable, this approach helps us save time and CPU resources. > > On Fri, May 16, 2025 at 12:11 PM Jon Haddad <j...@rustyrazorblade.com> > wrote: > > I spoke too soon - endless questions are not over :) > > Since the data that's going to be repaired only covers a range, I wonder > if it makes sense to have the ability to issue a minimalist snapshot that > only hardlinks SSTables that are in a token range. Based on what you > (Runtian) have said above, only a small percentage of the data would > actually be repaired at any given time. > > Just a thought to save a little filesystem churn. > > > On Fri, May 16, 2025 at 10:55 AM Jon Haddad <j...@rustyrazorblade.com> > wrote: > > Nevermind about the height thing i guess its the same property. > > I’m done for now :) > > Thanks for entertaining my endless questions. My biggest concerns about > repair have been alleviated. > > Jon > > On Fri, May 16, 2025 at 10:34 AM Jon Haddad <j...@rustyrazorblade.com> > wrote: > > Thats the critical bit i was missing, thank you Blake. > > I guess we’d need to have unlimited height trees then, since you’d need to > be able to update the hashes of individual partitions, and we’d also need > to propagate the hashes up every time as well. I’m curious what the cost > will look like with that. > > At least it’s a cpu problem not an I/O one. > > Jon > > > On Fri, May 16, 2025 at 10:04 AM Blake Eggleston <bl...@ultrablake.com> > wrote: > > > The merkle tree xor's the individual row hashes together, which is > commutative. So you should be able to build a tree in the view token order > while reading in base table token order and vise versa. > > On Fri, May 16, 2025, at 9:54 AM, Jon Haddad wrote: > > Thanks for the explanation, I appreciate it. I think you might still be > glossing over an important point - which I'll make singularly here. > There's a number of things I'm concerned about, but this is a big one. > > Calculating the hash of a partition for a Merkle tree needs to be done on > the fully materialized, sorted partition. > > The examples you're giving are simple, to the point where they hide the > problem. Here's a better example, where the MV has a clustering column. In > the MV's partition it'll have multiple rows, but in the base table it'll be > stored in different pages or different SSTables entirely: > > CREATE TABLE test.t1 ( > id int PRIMARY KEY, > v1 int > ); > > CREATE MATERIALIZED VIEW test.test_mv AS > SELECT v1, id > FROM test.t1 > WHERE id IS NOT NULL AND v1 IS NOT NULL > PRIMARY KEY (v1, id) > WITH CLUSTERING ORDER BY (id ASC); > > > Let's say we have some test data: > > cqlsh:test> select id, v1 from t1; > > id | v1 > ----+---- > 10 | 11 > 1 | 14 > 19 | 10 > 2 | 14 > 3 | 14 > > When we transform the data by iterating over the base table, we get this > representation (note v1=14): > > cqlsh:test> select v1, id from t1; > > v1 | id > ----+---- > 11 | 10 > 14 | 1 <------ > 10 | 19 > 14 | 2 <------ > 14 | 3 <------ > > > The partiton key in the new table is v1. If you simply iterate and > transform and calculate merkle trees on the fly, you'll hit v1=14 with > id=1, but you'll miss id=2 and id=3. You need to get them all up front, > and in sorted order, before you calculate the hash. You actually need to > transform the data to this, prior to calculating the tree: > > v1 | id > ----+---- > 11 | 10 > 14 | 1, 2, 3 > 10 | 19 > > Without an index you need to do one of the following over a dataset that's > hundreds of GB: > > * for each partition, scan the entire range for all the data, then sort > that partition in memory, then calculate the hash > * collect the entire dataset in memory, transform and sort it > * use a local index which has the keys already sorted > > A similar problem exists when trying to resolve the mismatches. > > Unless I'm missing some critical detail, I can't see how this will work > without requiring nodes have hundreds of GB of RAM or we do several orders > of magnitude more I/O than a normal repair. > > Jon > > > > On Thu, May 15, 2025 at 9:09 PM Runtian Liu <curly...@gmail.com> wrote: > > Thank you for the thoughtful questions, Jon. I really appreciate them—let > me go through them one by one. > ** *Do you intend on building all the Merkle trees in parallel? > > Since we take a snapshot to "freeze" the dataset, we don’t need to build > all Merkle trees in parallel. > > > * Will there be hundreds of files doing random IO to persist the trees to > disk, in addition to the sequential IO from repair? > > The Merkle tree will only be persisted after the entire range scan is > complete. > > > * Is the intention of persisting the trees to disk to recover from > failure, or just to limit memory usage? > > This is primarily to limit memory usage. As you may have noticed, MV > repair needs to coordinate across the entire cluster rather than just a few > nodes. This process may take very long time and it may node may restart or > do other operations during the time. > > > ** *Have you calculated the Merkle tree space requirements? > This is a very good question—I'll add it to the CEP as well. Each leaf > node stores a 32-byte hash. With a tree depth of 15 (which is on the higher > end—smaller datasets might use fewer than 10 levels), a single Merkle tree > would be approximately 32 × 2¹⁵ bytes, or 1 MB. If we split the tokens into > 10 ranges per node, we’ll end up with around 100 Merkle trees per node, > totaling roughly 100 MB. > * When do we build the Merkle trees for the view? Is that happening in > parallel with the base table? Do we have the computational complexity of 2 > full cluster repairs running simultaneously, or does it take twice as long? > > As mentioned earlier, this can be done in parallel with the base table or > after building the base table’s Merkle tree, since we’re using a snapshot > to “freeze” the data. > > > I'm very curious to hear if anyone has run a full cluster repair > recently on a non-trivial dataset. Every cluster I work with only does > subrange repair. I can't even recall the last time I did a full repair on > a large cluster. I may never have, now that I think about it. Every time > I've done this in the past it's been plagued with issues, both in terms of > performance and reliability. Subrange repair works because it can make > progress in 15-30 minute increments. > When we run a full repair, we trigger subrange repair on one node, then > proceed to the next subrange, and continue this way until the node's entire > primary range is repaired. After that, we move on to the next node—correct? > The complexity comparison between full repair and the proposed MV repair is > meant to compare the cost of repairing the entire dataset, not just a > subrange. > > For the example you mentioned, let me explain how it works using the > schema—without needing to create an index to build the Merkle trees. > > Suppose we have a node that owns the token range 1–30, and we have a few > records in the base table and its corresponding MV: > > - > > Base table: (1, 1), (2, 11), (12, 1), (23, 1) > - > > MV: (1, 1), (1, 12), (1, 23), (2, 11) > > When we run a full repair, we divide the node’s range into subranges of > size 10, we have r=3 ranges in total. > > First, we repair the range (1–10). The records (1, 1) and (2, 11) fall > into this range and are used to build the first Merkle tree, which is then > compared with the corresponding tree from another replica. > > Next, we repair the range (11–20). Here, the record (12, 1) is used to > build the second Merkle tree. > > Finally, we repair the range (21–30), using the record (23, 1) to build > the third Merkle tree, which is again compared with a replica's version. > > In MV repair, we still use a subrange size of 10. The key difference is > that each Merkle tree is responsible for data not just based on the base > table's partition key, but also on the MV's partition key. > > For example, when scanning the base table over the range (1–10): > > - > > In full repair, we generate one Merkle tree for that subrange. > - > > In MV repair, we generate *r* = 3 Merkle trees, one for each MV > partition key range. > > > This means the record (1, 1) will go into the first tree because the MV > partition key is 1, while (2, 11) will go into the second tree because its > MV key is 11. The third tree will be empty because there is no record with > base table key in (1-10) and MV key in (20-30). > > After scanning the base table range (1–10), we proceed to the next range, > (11–20), and again generate 3 Merkle trees, followed by the last range. > This is why the total number of Merkle trees is *r²*—in this case, 9 > trees need to be built for the entire table. > > A similar idea applies when scanning the MV to build Merkle trees. > Essentially, for MV repair, each Merkle tree represents two-dimensional > data, unlike normal repair where it only represents one dimension. Each > Merkle tree represents the data that maps to Range(x) in the base table and > Range(y) in the MV. > > > In full repair, tokens must be sorted when adding to the Merkle tree > because the tree is built from the leaves—records are added sequentially > from left to right. > For MV repair, since the leaf nodes are sorted by the MV partition key, a > base table row can be inserted into any leaf node. This means we must > insert each hash starting from the root instead of directly at the leaf. > As noted in the comparison table, this increases complexity: > > > > - > > In full repair, Merkle tree building is *O(1)* per row—each hash is > added sequentially to the leaf nodes. > - > > In MV repair, each hash must be inserted from the root, making it > *O(d)* per row. > > > Since *d* (the tree depth) is typically small—less than 20 and often > smaller than in full repair—this added complexity isn’t a major concern in > practice. The reason it is smaller than full repair is that, with the above > example, we use 3 trees to represent the same amount of data while full > repair uses 1 tree. > > > > > Note that within each leaf node, the order in which hashes are added > doesn’t matter. Cassandra repair currently enforces sorted input only to > ensure that leaf nodes are built from left to right. > > > So let's say we find a mismatch in a hash. That indicates that there's > some range of data where we have an issue. For some token range calculated > from the v1 field, we have a mismatch, right? What do we do with that > information? > With the above example being said, when we identify a range mismatch, it > means we’ve found that data within the base table primary key range (a–b) > and MV primary key range (m–n) has inconsistencies. We only need to rebuild > this specific data. > This allows us to easily locate the base table node that owns range (a–b) > and rebuild only the affected MV partition key range (m–n). > > * Will there be coordination between all nodes in the cluster to ensure > you don't have to do multiple scans? > > > Yes, coordination is important for this type of repair. With the proposed > solution, we can detect mismatches between the base table and the MV by > scanning data from each of them just once. > However, this doesn't mean all nodes need to be healthy during the repair. > You can think of all the Merkle trees as forming a 2D matrix—if one node is > down, it corresponds to one row and one column being unavailable for > comparison. The remaining cells can still be used for mismatch detection. > > > > Please don’t hesitate to let me know if anything is unclear or if you have > any further questions or concerns—I’d be happy to discuss them. > > > Thanks, > > Runtian > > > > > On Thu, May 15, 2025 at 6:34 PM Jon Haddad <j...@rustyrazorblade.com> > wrote: > > One last thing. I'm pretty sure building the tree requires the keys be > added in token order: > https://github.com/apache/cassandra/blob/08946652434edbce38a6395e71d4068898ea13fa/src/java/org/apache/cassandra/repair/Validator.java#L173 > > Which definitely introduces a bit of a problem, given that the tree would > be constructed from the transformed v1, which is a value unpredictable > enough to be considered random. > > The only way I can think of to address this would be to maintain a local > index on v1. See my previous email where I mentioned this. > > Base Table -> Local Index -> Global Index > > Still a really hard problem. > > Jon > > > > On Thu, May 15, 2025 at 6:12 PM Jon Haddad <j...@rustyrazorblade.com> > wrote: > > There's a lot here that's still confusing to me. Maybe you can help me > understand it better? Apologies in advance for the text wall :) > > I'll use this schema as an example: > > --------- > CREATE TABLE test.t1 ( > id int PRIMARY KEY, > v1 int > ); > > create MATERIALIZED VIEW test_mv as > SELECT v1, id from test.t1 where id is not null and v1 is not null primary > key (v1, id); > --------- > > We've got (id, v1) in the base table and (v1, id) in the MV. > > During the repair, we snapshot, and construct a whole bunch of merkle > trees. CEP-48 says they will be persisted to disk. > > ** *Do you intend on building all the Merkle trees in parallel? > * Will there be hundreds of files doing random IO to persist the trees to > disk, in addition to the sequential IO from repair? > * Is the intention of persisting the trees to disk to recover from > failure, or just to limit memory usage? > ** *Have you calculated the Merkle tree space requirements? > * When do we build the Merkle trees for the view? Is that happening in > parallel with the base table? Do we have the computational complexity of 2 > full cluster repairs running simultaneously, or does it take twice as long? > > I'm very curious to hear if anyone has run a full cluster repair recently > on a non-trivial dataset. Every cluster I work with only does subrange > repair. I can't even recall the last time I did a full repair on a large > cluster. I may never have, now that I think about it. Every time I've > done this in the past it's been plagued with issues, both in terms of > performance and reliability. Subrange repair works because it can make > progress in 15-30 minute increments. > > Anyways - moving on... > > You suggest we read the base table and construct the Merkle trees based on > the transformed rows. Using my schema above, we take the v1 field and use > token(v1), to build the tree. Assuming that a value for v1 appears many > times throughout the dataset across many partitions, how do you intend on > calculating it's hash? If you look at Validator.rowHash [1] and > Validator.add, you'll see it's taking an UnfilteredRowIterator for an > entire partition and calculates the hash based on that. Here's the comment: > > /** > * Called (in order) for every row present in the CF. > * Hashes the row, and adds it to the tree being built. > * > * @param partition Partition to add hash > */ > public void add(UnfilteredRowIterator partition) > > So it seems to me like you need to have the entire partition materialized > in memory before adding to the tree. Doing that per value v1 without an > index is pretty much impossible - we'd have to scan the entire dataset once > per partition to pull out all the matching v1 values, or you'd need to > materialize the entire dataset into a local version of the MV for that > range. I don't know how you could do this. Do you have a workaround for > this planned? Maybe someone that knows the Merkle tree code better can > chime in. > > Maybe there's something else here I'm not aware of - please let me know > what I'm missing here if I am, it would be great to see this in the doc if > you have a solution. > > For the sake of discussion, let's assume we've moved past this and we have > our tree for a hundreds of ranges built from the base table & built for the > MV, now we move onto the comparison. > > In the doc at this point, we delete the snapshot because we have the tree > structures and we compare Merkle trees. Then we stream mismatched data. > > So let's say we find a mismatch in a hash. That indicates that there's > some range of data where we have an issue. For some token range calculated > from the v1 field, we have a mismatch, right? What do we do with that > information? > > * Do we tell the node that owned the base table - hey, stream the data > from base where token(v1) is in range [X,Y) to me? > * That means we have to scan through the base again for all rows where > token(v1) in [X,Y) range, right? Because without an index on the hashes of > v1, we're doing a full table scan and hashing every v1 value to find out if > it needs to be streamed back to the MV. > * Are we doing this concurrently on all nodes? > * Will there be coordination between all nodes in the cluster to ensure > you don't have to do multiple scans? > > I realized there's a lot of questions here, but unfortunately I'm having a > hard time seeing how we can workaround some of the core assumptions around > constructing Merkle trees and using them to resolve the differences in a > way that matches up with what's in the doc. I have quite a few more things > to discuss, but I'll save them for a follow up once all these have been > sorted out. > > Thanks in advance! > Jon > > [1] > https://github.com/apache/cassandra/blob/08946652434edbce38a6395e71d4068898ea13fa/src/java/org/apache/cassandra/repair/Validator.java#L209 > > > > On Thu, May 15, 2025 at 10:10 AM Runtian Liu <curly...@gmail.com> wrote: > > The previous table compared the complexity of full repair and MV repair > when reconciling one dataset with another. In production, we typically use > a replication factor of 3 in one datacenter. This means full repair > involves 3n rows, while MV repair involves comparing 6n rows (base + MV). > Below is an updated comparison table reflecting this scenario. > > n: number of rows to repair (Total rows in the table) > > d: depth of one Merkle tree for MV repair > > r: number of split ranges > > p: data compacted away > > > This comparison focuses on the complexities of one round of full repair > with a replication factor of 3 versus repairing a single MV based on one > base table with replication factor 3. > > *Full Repair* > > *MV Repair* > > *Comment* > > Extra disk used > > 0 > > O(2*p) > > Since we take a snapshot at the beginning of the repair, any disk space > that would normally be freed by compaction will remain occupied until the > Merkle trees are successfully built and the snapshot is cleared. > > Data scan complexity > > O(3*n) > > O(6*n) > > Full repair scans *n* rows from the primary and 2*n* from replicas.3 > > MV repair scans 3n rows from the base table and 3n from the MV. > > Merkle Tree building time complexity > > O(3n) > > O(6*n*d) > > In full repair, Merkle tree building is *O(1)* per row—each hash is added > sequentially to the leaf nodes. > > In MV repair, each hash is inserted from the root, making it *O(d)* per > row. Since *d* is typically small (less than 20 and often smaller than in > full repair), this isn’t a major concern. > > Total Merkle tree count > > O(3*r) > > O(6*r^2) > > MV repair needs to generate more, smaller Merkle trees, but this isn’t a > concern as they can be persisted to disk during the repair process. > > Merkle tree comparison complexity > > O(3n) > > O(3n) > > Assuming one row maps to one leaf node, both repairs are equivalent. > > Stream time complexity > > >