yihua commented on a change in pull request #3527:
URL: https://github.com/apache/hudi/pull/3527#discussion_r701389046



##########
File path: website/blog/2021-08-18-improving-marker-mechanism.md
##########
@@ -0,0 +1,65 @@
+---
+title: "Improving Marker Mechanism in Apache Hudi"
+excerpt: "We introduce a new marker mechanism leveraging the timeline server 
to address performance bottlenecks due to rate-limiting on cloud storage like 
AWS S3."
+author: yihua
+category: blog
+---
+Write operations in an Apache Hudi table use markers to efficiently identify 
the data files written to the file system.  In this blog, we dive into the 
design of the existing direct marker file mechanism and explain its performance 
problem on cloud storage like AWS S3.  We demonstrate how we improve the write 
performance with timeline-server-based markers.
+
+<!--truncate-->
+
+## What is a marker and why it’s needed in write operations on Hudi Table
+ 
+A **marker** in Hudi, such as a marker file with a unique filename, is a label 
to indicate that a corresponding data file exists in the file system.  Each 
marker entry is composed of three parts, the data file name, the marker 
extension (`.marker`), and the I/O type (`CREATE`, `MERGE`, or `APPEND`).  For 
example, the marker 
`91245ce3-bb82-4f9f-969e-343364159174-0_140-579-0_20210820173605.parquet.marker.CREATE`
 indicates that the corresponding data file is 
`91245ce3-bb82-4f9f-969e-343364159174-0_140-579-0_20210820173605.parquet` and 
the I/O type is `CREATE`.  Before writing each data file, the Hudi write client 
creates a marker first in the file system.  Markers are persistent in the file 
system unless they are explicitly deleted by the write client.  The write 
client deletes all markers when the commit is successful.
+
+The markers are useful for efficiently carrying out different operations by 
the write client.  Two important operations use markers to find the data files 
of interest, instead of scanning the whole Hudi table:
+  - **Identifying duplicate data files**: in Spark, the Hudi write client 
delegates the data file writing to multiple executors.  One executor can fail 
the task, leaving partial data files written, and Spark retries the task in 
this case until it succeeds.  The markers help efficiently identify the partial 
data files written, which contain duplicate data compared to the data files 
written by the successful trial later, and these duplicate data files are 
cleaned up before the write and commit are finalized.
+  - **Rolling back failed commits**: the write operation can fail in the 
middle, leaving some data files written in the file system.  In this case, the 
marker entries stay in the file system as the commit is failed.  In the next 
write operation, the write client first rolls back the failed commits, by 
identifying the data files written in these commits through the markers and 
deleting them.
+
+Next, we dive into the existing marker mechanism, explain its performance 
problem, and demonstrate the new timeline-server-based marker mechanism to 
address the problem.
+
+## Existing direct marker file mechanism and its limitations
+
+The **existing marker mechanism** simply creates a new marker file 
corresponding to each data file, with the marker filename as described above.  
Each marker file is written to the file system in the same directory hierarchy, 
i.e., commit instant and partition path, under a temporary folder 
`.hoodie/.temp` under the base path of the Hudi table.  For example, the figure 
below shows one example of the marker files created and the corresponding data 
files when writing data to the Hudi table.  When getting or deleting all the 
marker file paths, the mechanism first lists all the paths under the temporary 
folder, `.hoodie/.temp/<commit_instant>`, and then does the operation.
+
+![An example of marker and data files in direct marker file 
mechanism](/assets/images/blog/marker-mechanism/direct-marker-file-mechanism.png)
+
+As the number of data files to write increases, so does the number of marker 
files to create.  This can create performance bottlenecks for cloud storage 
such as AWS S3.  In AWS S3, each file create and delete call triggers an HTTP 
request and there is 
[rate-limiting](https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html)
 on how many requests can be processed per second per prefix in a bucket.  When 
the number of data files to write concurrently and the number of marker files 
is huge, the marker file operations become the performance bottleneck.  In one 
case, the marker file deletion takes an hour to finish due to S3 rate-limiting 
for a bulk insert operation running for a few hours.  Such behavior degrades 
the performance of the write.
+
+## Timeline-server-based marker mechanism improving write performance
+
+To address the performance bottleneck due to rate-limiting of AWS S3 explained 
above, we introduce a **new marker mechanism leveraging the timeline server**, 
which optimizes the marker-related latency for file systems with non-trivial 
file I/O latency.  The **timeline server** in Hudi serves as a centralized 
place for providing the file system and timeline views. As shown below, the new 
timeline-server-based marker mechanism delegates the marker creation and other 
marker-related operations from individual executors to the timeline server for 
centralized processing.  The timeline server maintains the created markers in 
memory for corresponding marker requests.  The timeline server achieves 
consistency by periodically flushing the in-memory markers to a limited number 
of underlying files in the file system.  In such a way, the number of actual 
file operations and latency related to markers can be significantly reduced 
even with a huge number of data files, thus improving the performan
 ce of the writes.
+
+![Timeline-server-based marker 
mechanism](/assets/images/blog/marker-mechanism/timeline-server-based-marker-mechanism.png)
+
+To improve the efficiency of processing marker creation requests, we design 
the batch processing in the handler of marker requests at the timeline server.  
Each marker creation request is handled asynchronously in the Javalin timeline 
server and queued before processing. For every batch interval, e.g., 20ms, a 
dispatching thread pulls the pending requests from the queue and sends them to 
the worker thread for processing. Each worker thread processes the marker 
creation requests, sets the responses, and flushes the new markers by 
overwriting the underlying file storing the markers in the file system.  There 
are multiple worker threads running concurrently, given that the file 
overwriting takes longer than the batch interval, and each worker thread writes 
to an exclusive file not touched by other threads, to guarantee consistency and 
correctness. Both the batch interval and the number of worker threads can be 
configured through the write options.
+
+![Batched processing of marker creation 
requests](/assets/images/blog/marker-mechanism/batched-marker-creation.png)
+
+
+Note that the worker thread always checks whether the marker has already been 
created by comparing the marker name from the request with the memory copy of 
all markers maintained at the timeline server. The underlying files storing the 
markers are only read upon the first marker request (lazy loading).  The 
responses of requests are only sent back once the new markers are flushed to 
the files, so that in the case of the timeline server failure, the timeline 
server can recover the already created markers. These ensure consistency 
between the file system and the in-memory copy, and improve the performance of 
processing marker requests.
+
+## Marker-related write options
+
+We introduce the following new marker-related write options in 0.9.0 release, 
to configure the marker mechanism.
+
+| Property Name |   Default   |     Meaning    |        
+| ------------- | ----------- | :-------------:| 
+| `hoodie.write.markers.type`     | direct | Marker type to use.  Two modes 
are supported: (1) `direct`: individual marker file corresponding to each data 
file is directly created by the writer; (2) `timeline_server_based`: marker 
operations are all handled at the timeline service which serves as a proxy.  
New marker entries are batch processed and stored in a limited number of 
underlying files for efficiency. |
+| `hoodie.markers.timeline_server_based.batch.num_threads`     | 20 | Number 
of threads to use for batch processing marker creation requests at the timeline 
server. | 
+| `hoodie.markers.timeline_server_based.batch.interval_ms` | 50 | The batch 
interval in milliseconds for marker creation batch processing. |
+
+## Performance
+
+We evaluate the write performance over both direct and timeline-server-based 
marker mechanisms by bulk-inserting a large dataset using Amazon EMR with Spark 
and S3. The input data is around 100GB.  We configure the write operation to 
generate a large number of data files concurrently by setting the max parquet 
file size to be 1MB and parallelism to be 240.
+
+As shown below, the timeline-server-based marker mechanism generates much 
fewer files storing markers because of the batch processing, leading to much 
less time on marker-related I/O operations, thus achieving 31% lower write 
completion time compared to the direct marker file mechanism.
+
+| Marker Type |   Input   |  Num data file generated | Files created for 
markers | Marker deletion time | Bulk Insert Time (including marker deletion) |
+| ----------- | --------- | :---------: | :---------: | :---------: | 
:---------: | 
+| Direct | ~100GB | 165k | 165k | 15min | 55min |
+| Timeline-server-based | ~100GB | 165k | 20 | ~3s | 38min |
+
+## Conclusion
+
+We identify that the existing direct marker file mechanism incurs performance 
bottlenecks due to the rate-limiting of file create and delete calls on cloud 
storage like AWS S3.  To address this issue, we introduce a new marker 
mechanism leveraging the timeline server, which delegates the marker creation 
and other marker-related operations from individual executors to the timeline 
server and uses batch processing to improve performance.  Performance 
evaluations on Amazon EMR with Spark and S3 show that the marker-related I/O 
latency and overall write time are reduced.

Review comment:
       Right, fixed in the follow-up minor PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to