trushev opened a new pull request, #7626:
URL: https://github.com/apache/hudi/pull/7626

   This PR significantly reduces memory footprint on workload with thousand 
active partitions between checkpoints. That workload is relevant with wide 
checkpoint interval. More specifically, active partition here is a special case 
of active fileId.
   Write client holds map with write handles to create ReplaceHandle between 
checkpoints. It leads to `OutOfMemoryError` on the workload because write 
handle is a huge object.
   Essentially, it is enough to hold only write path instead of the whole 
handle.
   
   ### Change Logs
   
   1. Released writer in closed create handle. The same approach is used in 
append and merge handles.
   1. Introduced `FlinkClosedHandle` with the lowest memory footprint. It is 
needed because create handle is a huge object even with released writer.
   1. Removed append handle from handle map because it is not used anyway. It 
reduces memory footprint and fixes potential NPE issue
   1. Replaced `HoodieWriteHandle<?, ?, ?, ?>` with `MiniBatchHandle` in flink 
modules. It is needed because lightweight `FlinkClosedHandle` implements 
`MiniBatchHandle` and does not extend `HoodieWriteHandle`. Refactoring is 
correct because all flink's handlers implement `MiniBatchHandle`.
   
   ### Impact
   
   Workload test with `-Xmx2048m`
   ```SQL
   create table source (
       `id` int,
       `data` string
   ) with (
       'connector' = 'datagen',
       'rows-per-second' = '100',
       'fields.id.kind' = 'sequence',
       'fields.id.start' = '0',
       'fields.id.end' = '3000'
   );
   create table sink (
       `id` int primary key,
       `data` string,
       `part` string
   ) partitioned by (`part`) with (
       'connector' = 'hudi',
       'path' = '/tmp/sink',
       'write.batch.size' = '0.001',  -- 1024 bytes
       'write.task.max.size' = '101.001',  -- 101.001MB
       'write.merge.max_memory' = '1'  -- 1024 bytes
   );
   
   insert into sink select `id`, `data`, concat('part', cast(`id` as string)) 
as `part` from source;
   ```
   
   #### Before
   
   Job failure with `OutOfMemoryError` on partition 550. Heap dump says that 
handle map holds 1.4 GB
   
   <img width="1300" alt="Снимок экрана 2023-01-09 в 11 13 13" 
src="https://user-images.githubusercontent.com/42293632/211240084-c1c136d9-b8bf-46c2-9dde-48404fc5f38a.png";>
   
   
![fix-oom-before](https://user-images.githubusercontent.com/42293632/211239403-d0609d83-e56d-4326-9d5d-64db3c80bce9.png)
   
   #### After 
   3000 rows written
   
![fix-oom-after](https://user-images.githubusercontent.com/42293632/211239453-4898444f-1b76-4132-8bfc-891c716ca92a.png)
   
   ### Risk level (write none, low medium or high below)
   
   Low
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, 
config, or user-facing change_
   
   - _The config description must be updated if new configs are added or the 
default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. 
Please create a Jira ticket, attach the
     ticket number here and follow the 
[instruction](https://hudi.apache.org/contribute/developer-setup#website) to 
make
     changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to