danny0405 commented on code in PR #7907:
URL: https://github.com/apache/hudi/pull/7907#discussion_r1322324393


##########
rfc/rfc-66/rfc-66.md:
##########
@@ -0,0 +1,119 @@
+# RFC-66: Lockless Multi Writer
+
+## Proposers
+- @danny0405
+- @ForwardXu
+- @SteNicholas
+
+## Approvers
+-
+
+## Status
+
+JIRA: [Lockless multi writer 
support](https://issues.apache.org/jira/browse/HUDI-5672)
+
+## Abstract
+As you know, Hudi already supports basic OCC with abundant lock providers.
+But for multi streaming ingestion writers, the OCC does not work well because 
the conflicts happen in very high frequency.
+Expand it a little bit, with hashing index, all the writers have deterministic 
hashing algorithm for distributing the records by primary keys,
+all the keys are evenly distributed in all the data buckets, for a single data 
flushing in one writer, almost all the data buckets are appended with new 
inputs,
+so the conflict would very possibility happen for mul-writer because almost 
all the data buckets are being written by multiple writers at the same time;
+For bloom filter index, things are different, but remember that we have a 
small file load rebalance strategy to writer into the **small** bucket in 
higher priority,
+that means, multiple writers prune to write into the same **small** buckets at 
the same time, that's how conflicts happen.
+
+In general, for multiple streaming writers ingestion, explicit lock is not 
very capable of putting into production, in this RFC, we propse a lockless 
solution for streaming ingestion.
+
+## Background
+
+Streaming jobs are naturally suitable for data ingestion, it has no complexity 
of pipeline orchestration and has a smother write workload.
+Most of the raw data set we are handling today are generating all the time in 
streaming way.
+
+Based on that, many requests for multiple writers' ingestion are derived. With 
multi-writer ingestion, several streaming events with the same schema can be 
drained into one Hudi table,
+the Hudi table kind of becomes a UNION table view for all the input data set. 
This is a very common use case because in reality, the data sets are usually 
scattered all over the data sources.
+
+Another very useful use case we wanna unlock is the real-time data set join. 
One of the biggest pain point in streaming computation is the dataset join,
+the engine like Flink has basic supports for all kind of SQL JOINs, but it 
stores the input records within its inner state-backend which is a huge cost 
for pure data join with no additional computations.
+In [HUDI-3304](https://issues.apache.org/jira/browse/HUDI-3304), we introduced 
a `PartialUpdateAvroPayload`, in combination with the lockless multi-writer,
+we can implement N-ways data sources join in real-time! Hudi would take care 
of the payload join during compaction service procedure.
+
+## Design
+
+### The Precondition
+
+#### MOR Table Type Is Required
+
+The table type must be `MERGE_ON_READ`, so that we can defer the conflict 
resolution to the compaction phase. The compaction service would resolve the 
conflicts of the same keys by respecting the event time sequence of the events.
+
+#### Deterministic Bucketing Strategy
+
+Deterministic bucketing strategy is required, because the same records keys 
from different writers are desired to be distributed into the same bucket, not 
only for UPSERTs, but also for all the new INSERTs.
+
+#### Lazy Cleaning Strategy
+
+Config the cleaning strategy as lazy so that the pending instants are not 
rolled back by the other active writers.
+
+### Basic Work Flow
+
+#### Writing Log Files Separately In Sequence
+
+Basically, each writer flushes the log files in sequence, the log file rolls 
over for different versioning number,
+a pivotal thing needs to note here is that we need to make the write_token 
unique for the same version log files with the same base instant time,
+so that the file name does not conflict for the writers.
+
+The log files generated by a single writer can still preserve the sequence by 
versioning number, which is important if the natual order is needed for single 
writer events.
+
+![multi-writer](multi_writer.png)
+
+### The Compaction Procedure
+
+The compaction service is the duty role that actually resoves the conflicts. 
Within a file group, it sorts the files then merge all the record payloads for 
a record key.
+The event time sequence is respected by combining the payloads with even time 
field provided by the payload (known as the `preCombine` field in Hudi).
+
+![compaction procedure](compaction.png)
+
+#### Non-Serial Compaction Plan Schedule
+Currently, the compaction plan scheduling must be in serial order with the 
writers, that means, while scheduling the compaction plan, no ongoing writers 
should be writing to
+the table. This restriction makes the compaction almost impossible for multi 
streaming writers because there is always an instant writing to the table for 
streaming ingestion.
+
+In order to unblock the compaction plan scheduling and keep the completeness 
of the readers, we introduce the completion time for file slice generation:
+
+- Support quick look-up from instant time ➝ completion time, the 
[HUDI-6539](https://issues.apache.org/jira/browse/HUDI-6539) supports fast 
completion time queries on archived timeline, based on this, we are able
+to support flexible completion time queries on both active and archived 
timeline, see [HUDI-6725](https://issues.apache.org/jira/browse/HUDI-6725);
+- New compaction plan scheduling to comply with the completion time, that is: 
only log files that have smaller completion time(than the compaction start 
instant time) should be considered
+- New file slice generation strategy, a log file with smaller instant time 
(than the compaction instant time) but greater completion time should be 
assigned to a new file slice
+- By combining #2 and #3, in general, we are slicing by comparing the 
compaction start time with the log files completion time.
+
+<img src="non_serial_compaction.png" alt="drawing" width="400"/>
+
+Assumes we have two log files, with instant time & completion time as [t1, t3] 
and [t2, t5], at t4, a compaction plan was scheduled,
+the plan does not include file t2_t5_v1.w2, in the reader view, the log file 
should be assigned to a different file slice than the t4 instant time.
+
+#### Global Monotonically Increasing Timestamp
+
+In order to make the time deterministic among cloud storages, we use a logical 
time generated by a special **TimeGenerator**, see 
[HUDI-1623](https://issues.apache.org/jira/browse/HUDI-1623) for details.
+
+### The Log File Naming Convention
+
+We use the current instant time instead of the base commit time in the file 
name,
+so that tasks from different writers can not conflict in file name. We can 
also parse the file name to fetch the instant time quickly. 
+Finally, the log name is with the following pattern:
+
+```shell
+${uuid}_${instant}.log.${version}_${task_token}

Review Comment:
   Updated, but I'm -1 for introduing the `base Transaction timestamp` in the 
file name, no one knows what is that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to