codope commented on code in PR #9537:
URL: https://github.com/apache/hudi/pull/9537#discussion_r1310588263


##########
rfc/rfc-73/rfc-73.md:
##########
@@ -0,0 +1,416 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# RFC-73: Multi-Table Transactions
+
+## Proposers
+
+- @codope
+
+## Approvers
+
+- @vinothchandar
+
+## Status
+
+JIRA: [HUDI-6709](https://issues.apache.org/jira/browse/HUDI-6709)
+
+## Abstract
+
+Modern data lake architectures often comprise numerous interconnected tables. 
Operations, such as data backfill,
+upserts, deletes or complex transformations, may span across multiple tables. 
In these scenarios, it's crucial that
+these operations are atomic - i.e., they either succeed across all tables or 
fail without partial writes. This ensures
+data consistency across the entire dataset. Users can design data workflows 
with the assurance that operations spanning
+multiple tables are treated as a single atomic unit.
+
+## Background
+
+Hudi has always emphasized transactional guarantees, ensuring data integrity 
and consistency for a specific table.
+Central to Hudi's approach to transactions is its 
[timeline](https://hudi.apache.org/docs/timeline), which logs all
+actions (like commits, deltacommits, rollbacks, etc) on the table. With 
timeline as the source of truth for all changes
+on the table, Hudi employs tunable [concurrency 
control](https://hudi.apache.org/docs/concurrency_control) to allow for
+concurrent reads and writes on the table
+with [snapshot 
isolation](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+22+%3A+Snapshot+Isolation+using+Optimistic+Concurrency+Control+for+multi-writers).
+This is achieved by leveraging the timeline to determine the latest consistent 
snapshot of the table. Additionally,
+users can bring their own custom conflict resolution strategy by implementing 
the `ConflictResolutionStrategy`
+interface.
+
+However, the current implementation cannot be extended as-is to support atomic 
operations across multiple tables. First
+of all, we need a notion of a "database" and its tables to be able to 
associate a transaction with multiple tables.
+Secondly, Hudi's timeline would need to evolve to account for changes across 
multiple tables. This introduces
+complexities in tracking and managing the order of operations across tables. 
With multiple tables involved, the points
+of potential failures increase. A failure in one table can cascade and affect 
the entire transaction. In case of
+failures, rolling back changes becomes more intricate in multi-table 
scenarios. Ensuring data consistency across tables
+during rollbacks, especially when there are inter-table dependencies, 
introduces additional challenges. Finally, the
+current concurrency control implementation is not designed to handle 
multi-table transactions. Hence, this RFC proposes
+a new design to support multi-table transactions.
+
+## Design
+
+First of all, let us discuss the goals and non-goals which will help in 
understanding the design considerations better.
+
+### Goals
+
+1. **Atomicity Across Tables:** Ensure that a set of changes spanning multiple 
tables either fully completes or fully
+   rolls back. No partial commits should occur.
+2. **Consistency:** Ensure that, after every transaction, all involved tables 
are in a consistent state. Avoid dirty
+   reads, non-repeatable reads, handle read-write and write-write conflicts.
+3. **Isolation:** Multiple concurrent transactions should not interfere with 
each other. One transaction shouldn't see
+   the intermediate states of another ongoing transaction.
+4. **Durability:** Once a multi-table transaction is committed, the changes 
should be permanent and recoverable, even
+   after system failures.
+5. **Performance:** The multi-table transaction mechanism should introduce 
minimal overhead. Its performance impact on
+   typical Hudi operations should be kept low. By corollary, locking (in case 
of OCC) cannot be coarser than file level.
+6. **Monitoring:** Integration with tools such as hudi-cli to track the status 
and health of transactions. Separate
+   transaction metrics. Also, provide comprehensive logs for debugging 
transaction failures or issues.
+7. **Integration with Current Hudi Features:** The multi-table transaction 
should be seamlessly integrated with existing
+   Hudi features and should not break existing APIs or workflows. In
+   particular, `TransacationManager`, `LockManager`, 
`ConflictResolutionStrategy` APIs should work as they do today with
+   single table.
+8. **Configurability:** Allow users to configure the behavior of multi-table 
transactions, e.g., concurrency control
+   mechanism, timeout durations, etc.
+9. **Recovery**: Rollbacks, savepoint and restore should be supported.
+
+### Non-goals
+
+1. **Cross-Database Transactions:** Transactions spanning multiple databases 
might be too complex as an initial target.
+2. **Granular Record Locking:** Instead of locking at the granularity of 
individual records, coarse-grained locking (
+   like at the file level) might be more practical to start with. But, let's 
avoid table or partition level locking as
+   mentioned in the goals.
+3. **Distributed Transactions:** We are not going to consider transactions in 
the sense of distributed databases, and
+   thus the need of a protocol such as 2PC or a quorum mechanism to proceed 
with the commits.
+4. **Complex Conflict Resolution:** Initially, simple conflict resolution 
strategies (like aborting a transaction on
+   conflict) can be implemented, leaving more sophisticated strategies for 
future iterations.
+5. **Replication/CDC stream:** We are not going to build a replication or CDC 
stream out of the database level commit
+   log, though that would be a good usage of the database timeline introduced 
in this RFC.
+
+### Design
+
+Our primary challenge is to support operations that span multiple tables 
within a single database, and maintain the ACID
+properties across these tables.
+
+1. Need for a catalog: Do we need a catalog API that tracks databases and its 
tables?
+2. Need for a transaction coordinator: A centralized coordinator that will 
manage transaction logs, track ongoing
+   transactions, handle timeouts, and manage transaction rollbacks.
+3. Need for a transaction log: At the table level, the timeline incorporating 
state of an action with start and modified
+   time serves the purpose of transaction log. At the database level, we need 
to track all multi-table transactions.
+   Every start, update, or commit of a multi-table transaction gets recorded 
(Database timeline?).
+4. Locking/Conflict resolution mechanism: Lock the affected files during the 
multi-table transaction to prevent
+   conflicting writes. Decide on conflict resolution strategies (e.g., last 
write wins, version vectors).
+5. Need for buffer management: Writes by a transaction are not immediately 
visible to other transactions. They are
+   buffered until the transaction decides to commit. In-memory vs durable 
buffer (reuse `.temp` dir)?
+
+#### Concurrency Control and Conflict Resolution
+
+Today we have an event log at the table level in the form of Hudi timeline. To 
support multi-table transactions, we are
+going to need a unified view of all tables within the database to ensure 
atomicity and consistency. Hence, we propose a
+database-level timeline as transaction log, which mainly contains the 
following:
+
+* Transaction ID
+* Tables involved
+* Type of operation (e.g., write, delete)
+* Start Timestamp
+* End/Modified Timestamp
+* State of transaction: REQUESTED, INFLIGHT, COMPLETED, ROLLED\_BACK
+* Any other relevant metadata
+
+Hudi naturally supports MVCC based snapshot isolation. We can leverage the 
Hudi table and database timeline to support
+snapshot isolation with multi-table transactions with concurrent readers and 
writers.
+
+Anomalies or Conflicts
+
+With MVCC and snapshot isolation:
+
+1. **Start of Transaction**: When a transaction begins, it's given a 
"snapshot" of the database as it appeared at the
+   start time.
+2. **Reads**: The transaction can read from its snapshot without seeing any 
intermediate changes made by other
+   transactions.
+3. **Writes**: The writes are made to a new "version" of the data. This 
doesn't affect other ongoing transactions as
+   they continue to work with their respective snapshots.
+4. **Commit**: During commit, the system checks for write-write conflicts, 
i.e., if another transaction has committed
+   changes to the same data after the current transaction's start time. If 
there's a conflict, the current transaction
+   may be aborted.
+
+#### 1\. Dirty Reads:
+
+**Definition**: A transaction reads data that's been written by a still 
in-progress transaction.
+
+**Analysis with MVCC**: Dirty reads are inherently prevented in MVCC with 
snapshot isolation. Since every transaction is
+reading from its snapshot, it won't see uncommitted changes made by other 
transactions.
+
+#### 2\. Phantom Reads:
+
+**Definition**: In the course of a transaction, new records get added or old 
records get removed, which fit the criteria
+of a previous read in the transaction.
+
+**Analysis with MVCC**: Since transactions operate on snapshots, they won't 
see new records added after the snapshot was
+taken. However, if a transaction's intent is to ensure that new records of a 
certain kind don't get added, additional
+mechanisms, like predicate locks, might be needed. If we don't allow snapshot 
to be refreshed within the same
+transaction, then phantom reads seem improbable (including self-join).
+
+* Textbook phantom read example
+
+  Suppose you have a table named `Employees` with columns `employee_id` and 
`manager_id`, where the `manager_id` for
+  some employees refers to the `employee_id` of other employees in the same 
table. One might execute a self-join to
+  retrieve a list of employees and their managers:
+
+  ```plain
+  SELECT e1.employee_id, e2.employee_id AS manager_id
+  FROM Employees e1
+  JOIN Employees e2 ON e1.manager_id = e2.employee_id;
+  ```
+
+  This query essentially matches each employee with their manager, using a 
self-join on the `Employees` table.
+
+    1. **Transaction A** starts and runs the self-join query to get a list of 
employees and their respective managers.
+    2. While **Transaction A** is still in progress, **Transaction B** starts 
and adds a new row to the `Employees`
+       table, inserting a new employee with a manager whose `employee_id` is 
already in the table.
+    3. **Transaction B** commits its changes.
+    4. If **Transaction A** re-runs the same self-join query, it might see the 
newly added row, resulting in an
+       additional result in the join output that wasn't there during the 
initial query. This is a phantom read.
+
+With MVCC and snapshot isolation level, a transaction would continue to see 
the state of the database as it was when the
+transaction started, even if it re-runs the self-join. This level will prevent 
the phantom read in this case. However,
+it cannot be guaranteed with read committed.
+
+#### 3\. Read-Write Conflict:
+
+**Definition**: Given a scenario of a concurrent read happening while the 
transactions are ongoing: A transaction reads
+multiple items, but by the time it finishes reading, another transaction has 
modified some of the earlier-read items,
+resulting in an inconsistent view.
+
+**Analysis with MVCC**: The data that a transaction reads remains consistent 
for the duration of the transaction. With
+MVCC and snapshot isolation, the read operation will see a consistent snapshot 
of the database, depending on when the
+read started. It won't see the uncommitted changes made by the ongoing 
transactions. So, the read is consistent, and
+there's no anomaly related to the read operation in this setup.
+
+#### 4\. Write-Write Conflict:
+
+**Definition**: Two transactions read the same data, and based on the read, 
they both decide to modify the data, leading
+to a situation where the final outcome might not be consistent with the intent 
of either transaction.
+
+**Analysis with MVCC**: This is a **potential problem** even with snapshot 
isolation. Let's say two transactions, T1 and
+T2, start at the same time and read the same file in one of the tables. Both 
decide to modify the file based on what
+they've read. Since they are operating on snapshots, neither transaction sees 
the other's changes. When they try to
+commit, they might both try to create a new version of the file, leading to a 
write-write conflict. The system, noticing
+the conflict, would typically abort one of the transactions or apply some 
conflict resolution strategy.
+
+#### Conflict Resolution
+
+Conflict detection will happen on the basis of set of partitions and fileIDs 
mutated by the transaction. Let's say there
+are two conflicting transaction T1 and T2. Both of them, when they start, 
fetch latest versions of all tables and
+register themselves with start timestamp in the database timeline, which will 
be greater than all versions of tables
+involved in T1 and T2. To handle conflict before committing, we have the 
following options:
+
+1. **First committer/Younger transaction wins** - Essentially, no resolution 
required with end timestamp based ordering.
+   If a transaction tries to modify data that has been modified by a younger 
transaction (with a later timestamp), the
+   older transaction is rolled back to avoid the conflict. This ensures that 
transactions are serialized based on their
+   timestamps.
+    1. Pros: Transactions are serialized in a consistent order. Works well in 
environments with low contention.
+    2. Cons: Have to maintain end timestamp for each action. Potentially high 
abort rates in high contention
+       environments.
+
+2. **OCC (with lock provider)** - With start and end timestamp, we have the 
option of "wait-die" and "
+   wound-wait" ([CMU notes on 2PL section 
3](https://15445.courses.cs.cmu.edu/fall2022/notes/16-twophaselocking.pdf))
+   strategies. In the Wait-Die strategy, if an older transaction requests a 
lock held by a younger one, the older
+   transaction is forced to wait or is aborted ("die") after some time. In the 
Wound-Wait scheme, the younger
+   transaction is aborted ("wounded") to allow the older transaction to 
proceed.
+    1. Pros: Provides a mechanism to prioritize older transactions.
+    2. Cons: Complexity in implementation and potentially high waiting time if 
there are frequent short-running
+       transactions.
+
+3. [Compensating 
transaction](https://learn.microsoft.com/en-us/azure/architecture/patterns/compensating-transaction)
 -
+   Instead of aborting a transaction when a conflict is detected, another 
transaction is executed based on the latest
+   snapshot to compensate for the effects of the conflicting transaction.
+    1. Pros: Avoids the need to abort and retry. Could be useful for 
long-running transactions.
+    2. Cons: Complexity in designing and ensuring the correctness of 
compensating transactions.
+
+## Implementation
+
+### **SQL Writes**

Review Comment:
   Got it. The BEGIN..END is very similar to what we have in MySQL - 
https://dev.mysql.com/doc/refman/8.0/en/begin-end.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to