codope commented on code in PR #9537: URL: https://github.com/apache/hudi/pull/9537#discussion_r1310588263
########## rfc/rfc-73/rfc-73.md: ########## @@ -0,0 +1,416 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# RFC-73: Multi-Table Transactions + +## Proposers + +- @codope + +## Approvers + +- @vinothchandar + +## Status + +JIRA: [HUDI-6709](https://issues.apache.org/jira/browse/HUDI-6709) + +## Abstract + +Modern data lake architectures often comprise numerous interconnected tables. Operations, such as data backfill, +upserts, deletes or complex transformations, may span across multiple tables. In these scenarios, it's crucial that +these operations are atomic - i.e., they either succeed across all tables or fail without partial writes. This ensures +data consistency across the entire dataset. Users can design data workflows with the assurance that operations spanning +multiple tables are treated as a single atomic unit. + +## Background + +Hudi has always emphasized transactional guarantees, ensuring data integrity and consistency for a specific table. +Central to Hudi's approach to transactions is its [timeline](https://hudi.apache.org/docs/timeline), which logs all +actions (like commits, deltacommits, rollbacks, etc) on the table. With timeline as the source of truth for all changes +on the table, Hudi employs tunable [concurrency control](https://hudi.apache.org/docs/concurrency_control) to allow for +concurrent reads and writes on the table +with [snapshot isolation](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+22+%3A+Snapshot+Isolation+using+Optimistic+Concurrency+Control+for+multi-writers). +This is achieved by leveraging the timeline to determine the latest consistent snapshot of the table. Additionally, +users can bring their own custom conflict resolution strategy by implementing the `ConflictResolutionStrategy` +interface. + +However, the current implementation cannot be extended as-is to support atomic operations across multiple tables. First +of all, we need a notion of a "database" and its tables to be able to associate a transaction with multiple tables. +Secondly, Hudi's timeline would need to evolve to account for changes across multiple tables. This introduces +complexities in tracking and managing the order of operations across tables. With multiple tables involved, the points +of potential failures increase. A failure in one table can cascade and affect the entire transaction. In case of +failures, rolling back changes becomes more intricate in multi-table scenarios. Ensuring data consistency across tables +during rollbacks, especially when there are inter-table dependencies, introduces additional challenges. Finally, the +current concurrency control implementation is not designed to handle multi-table transactions. Hence, this RFC proposes +a new design to support multi-table transactions. + +## Design + +First of all, let us discuss the goals and non-goals which will help in understanding the design considerations better. + +### Goals + +1. **Atomicity Across Tables:** Ensure that a set of changes spanning multiple tables either fully completes or fully + rolls back. No partial commits should occur. +2. **Consistency:** Ensure that, after every transaction, all involved tables are in a consistent state. Avoid dirty + reads, non-repeatable reads, handle read-write and write-write conflicts. +3. **Isolation:** Multiple concurrent transactions should not interfere with each other. One transaction shouldn't see + the intermediate states of another ongoing transaction. +4. **Durability:** Once a multi-table transaction is committed, the changes should be permanent and recoverable, even + after system failures. +5. **Performance:** The multi-table transaction mechanism should introduce minimal overhead. Its performance impact on + typical Hudi operations should be kept low. By corollary, locking (in case of OCC) cannot be coarser than file level. +6. **Monitoring:** Integration with tools such as hudi-cli to track the status and health of transactions. Separate + transaction metrics. Also, provide comprehensive logs for debugging transaction failures or issues. +7. **Integration with Current Hudi Features:** The multi-table transaction should be seamlessly integrated with existing + Hudi features and should not break existing APIs or workflows. In + particular, `TransacationManager`, `LockManager`, `ConflictResolutionStrategy` APIs should work as they do today with + single table. +8. **Configurability:** Allow users to configure the behavior of multi-table transactions, e.g., concurrency control + mechanism, timeout durations, etc. +9. **Recovery**: Rollbacks, savepoint and restore should be supported. + +### Non-goals + +1. **Cross-Database Transactions:** Transactions spanning multiple databases might be too complex as an initial target. +2. **Granular Record Locking:** Instead of locking at the granularity of individual records, coarse-grained locking ( + like at the file level) might be more practical to start with. But, let's avoid table or partition level locking as + mentioned in the goals. +3. **Distributed Transactions:** We are not going to consider transactions in the sense of distributed databases, and + thus the need of a protocol such as 2PC or a quorum mechanism to proceed with the commits. +4. **Complex Conflict Resolution:** Initially, simple conflict resolution strategies (like aborting a transaction on + conflict) can be implemented, leaving more sophisticated strategies for future iterations. +5. **Replication/CDC stream:** We are not going to build a replication or CDC stream out of the database level commit + log, though that would be a good usage of the database timeline introduced in this RFC. + +### Design + +Our primary challenge is to support operations that span multiple tables within a single database, and maintain the ACID +properties across these tables. + +1. Need for a catalog: Do we need a catalog API that tracks databases and its tables? +2. Need for a transaction coordinator: A centralized coordinator that will manage transaction logs, track ongoing + transactions, handle timeouts, and manage transaction rollbacks. +3. Need for a transaction log: At the table level, the timeline incorporating state of an action with start and modified + time serves the purpose of transaction log. At the database level, we need to track all multi-table transactions. + Every start, update, or commit of a multi-table transaction gets recorded (Database timeline?). +4. Locking/Conflict resolution mechanism: Lock the affected files during the multi-table transaction to prevent + conflicting writes. Decide on conflict resolution strategies (e.g., last write wins, version vectors). +5. Need for buffer management: Writes by a transaction are not immediately visible to other transactions. They are + buffered until the transaction decides to commit. In-memory vs durable buffer (reuse `.temp` dir)? + +#### Concurrency Control and Conflict Resolution + +Today we have an event log at the table level in the form of Hudi timeline. To support multi-table transactions, we are +going to need a unified view of all tables within the database to ensure atomicity and consistency. Hence, we propose a +database-level timeline as transaction log, which mainly contains the following: + +* Transaction ID +* Tables involved +* Type of operation (e.g., write, delete) +* Start Timestamp +* End/Modified Timestamp +* State of transaction: REQUESTED, INFLIGHT, COMPLETED, ROLLED\_BACK +* Any other relevant metadata + +Hudi naturally supports MVCC based snapshot isolation. We can leverage the Hudi table and database timeline to support +snapshot isolation with multi-table transactions with concurrent readers and writers. + +Anomalies or Conflicts + +With MVCC and snapshot isolation: + +1. **Start of Transaction**: When a transaction begins, it's given a "snapshot" of the database as it appeared at the + start time. +2. **Reads**: The transaction can read from its snapshot without seeing any intermediate changes made by other + transactions. +3. **Writes**: The writes are made to a new "version" of the data. This doesn't affect other ongoing transactions as + they continue to work with their respective snapshots. +4. **Commit**: During commit, the system checks for write-write conflicts, i.e., if another transaction has committed + changes to the same data after the current transaction's start time. If there's a conflict, the current transaction + may be aborted. + +#### 1\. Dirty Reads: + +**Definition**: A transaction reads data that's been written by a still in-progress transaction. + +**Analysis with MVCC**: Dirty reads are inherently prevented in MVCC with snapshot isolation. Since every transaction is +reading from its snapshot, it won't see uncommitted changes made by other transactions. + +#### 2\. Phantom Reads: + +**Definition**: In the course of a transaction, new records get added or old records get removed, which fit the criteria +of a previous read in the transaction. + +**Analysis with MVCC**: Since transactions operate on snapshots, they won't see new records added after the snapshot was +taken. However, if a transaction's intent is to ensure that new records of a certain kind don't get added, additional +mechanisms, like predicate locks, might be needed. If we don't allow snapshot to be refreshed within the same +transaction, then phantom reads seem improbable (including self-join). + +* Textbook phantom read example + + Suppose you have a table named `Employees` with columns `employee_id` and `manager_id`, where the `manager_id` for + some employees refers to the `employee_id` of other employees in the same table. One might execute a self-join to + retrieve a list of employees and their managers: + + ```plain + SELECT e1.employee_id, e2.employee_id AS manager_id + FROM Employees e1 + JOIN Employees e2 ON e1.manager_id = e2.employee_id; + ``` + + This query essentially matches each employee with their manager, using a self-join on the `Employees` table. + + 1. **Transaction A** starts and runs the self-join query to get a list of employees and their respective managers. + 2. While **Transaction A** is still in progress, **Transaction B** starts and adds a new row to the `Employees` + table, inserting a new employee with a manager whose `employee_id` is already in the table. + 3. **Transaction B** commits its changes. + 4. If **Transaction A** re-runs the same self-join query, it might see the newly added row, resulting in an + additional result in the join output that wasn't there during the initial query. This is a phantom read. + +With MVCC and snapshot isolation level, a transaction would continue to see the state of the database as it was when the +transaction started, even if it re-runs the self-join. This level will prevent the phantom read in this case. However, +it cannot be guaranteed with read committed. + +#### 3\. Read-Write Conflict: + +**Definition**: Given a scenario of a concurrent read happening while the transactions are ongoing: A transaction reads +multiple items, but by the time it finishes reading, another transaction has modified some of the earlier-read items, +resulting in an inconsistent view. + +**Analysis with MVCC**: The data that a transaction reads remains consistent for the duration of the transaction. With +MVCC and snapshot isolation, the read operation will see a consistent snapshot of the database, depending on when the +read started. It won't see the uncommitted changes made by the ongoing transactions. So, the read is consistent, and +there's no anomaly related to the read operation in this setup. + +#### 4\. Write-Write Conflict: + +**Definition**: Two transactions read the same data, and based on the read, they both decide to modify the data, leading +to a situation where the final outcome might not be consistent with the intent of either transaction. + +**Analysis with MVCC**: This is a **potential problem** even with snapshot isolation. Let's say two transactions, T1 and +T2, start at the same time and read the same file in one of the tables. Both decide to modify the file based on what +they've read. Since they are operating on snapshots, neither transaction sees the other's changes. When they try to +commit, they might both try to create a new version of the file, leading to a write-write conflict. The system, noticing +the conflict, would typically abort one of the transactions or apply some conflict resolution strategy. + +#### Conflict Resolution + +Conflict detection will happen on the basis of set of partitions and fileIDs mutated by the transaction. Let's say there +are two conflicting transaction T1 and T2. Both of them, when they start, fetch latest versions of all tables and +register themselves with start timestamp in the database timeline, which will be greater than all versions of tables +involved in T1 and T2. To handle conflict before committing, we have the following options: + +1. **First committer/Younger transaction wins** - Essentially, no resolution required with end timestamp based ordering. + If a transaction tries to modify data that has been modified by a younger transaction (with a later timestamp), the + older transaction is rolled back to avoid the conflict. This ensures that transactions are serialized based on their + timestamps. + 1. Pros: Transactions are serialized in a consistent order. Works well in environments with low contention. + 2. Cons: Have to maintain end timestamp for each action. Potentially high abort rates in high contention + environments. + +2. **OCC (with lock provider)** - With start and end timestamp, we have the option of "wait-die" and " + wound-wait" ([CMU notes on 2PL section 3](https://15445.courses.cs.cmu.edu/fall2022/notes/16-twophaselocking.pdf)) + strategies. In the Wait-Die strategy, if an older transaction requests a lock held by a younger one, the older + transaction is forced to wait or is aborted ("die") after some time. In the Wound-Wait scheme, the younger + transaction is aborted ("wounded") to allow the older transaction to proceed. + 1. Pros: Provides a mechanism to prioritize older transactions. + 2. Cons: Complexity in implementation and potentially high waiting time if there are frequent short-running + transactions. + +3. [Compensating transaction](https://learn.microsoft.com/en-us/azure/architecture/patterns/compensating-transaction) - + Instead of aborting a transaction when a conflict is detected, another transaction is executed based on the latest + snapshot to compensate for the effects of the conflicting transaction. + 1. Pros: Avoids the need to abort and retry. Could be useful for long-running transactions. + 2. Cons: Complexity in designing and ensuring the correctness of compensating transactions. + +## Implementation + +### **SQL Writes** Review Comment: Got it. The BEGIN..END is very similar to what we have in MySQL - https://dev.mysql.com/doc/refman/8.0/en/begin-end.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
