[
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694438#comment-17694438
]
Yanfei Lei edited comment on FLINK-31238 at 2/28/23 8:49 AM:
-------------------------------------------------------------
Thanks for your sharing, this drastically reduces the number of files that need
to be traversed. I am also curious about how seqno is handled. After
rewriting, is the seqno in the new SST file consistent with the original?
Is there any difference between ingestDB() and
[IngestExternalFile()|https://rocksdb.org/blog/2017/02/17/bulkoad-ingest-sst-file.html]
?
BTW, are you planning to push clipDB() and ingestDB() to the RocksDB community?
was (Author: yanfei lei):
Thanks for your sharing, this drastically reduces the number of files that need
to be traversed. I am also curious about how seqno is handled. After
rewriting, is the seqno in the new SST file consistent with the original?
is there any difference between ingestDB() and
[IngestExternalFile()|https://rocksdb.org/blog/2017/02/17/bulkoad-ingest-sst-file.html]
BTW, are you planning to push clipDB() and ingestDB() to the RocksDB community?
> Use IngestDB to speed up Rocksdb rescaling recovery
> ----------------------------------------------------
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Affects Versions: 1.16.1
> Reporter: Yue Ma
> Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png,
> image-2023-02-27-16-57-18-435.png
>
>
> There have been many discussions and optimizations in the community about
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
> # Insert the valid keyGroup data of the new task.
> # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then
> insert data using writeBatch.In addition, the method of deleteRange is
> currently used to speed up the ClipDB. But in our production environment, we
> found that the speed of rescaling is still very slow, especially when the
> state of a single Task is large.
>
> We hope that the previous sst file can be reused directly when restoring
> state, instead of retraversing the data. So we made some attempts to optimize
> it in our internal version of flink and frocksdb.
>
> We added two APIs *ClipDb* and *IngestDb* in frocksdb.
> * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts
> beyond the key range. We will iterate over the FileMetaData of db. Process
> each sst file. There are three situations here.
> If all the keys of a file are required, we will keep the sst file and do
> nothing
> If all the keys of the sst file exceed the specified range, we will delete
> the file directly.
> If we only need some part of the sst file, we will rewrite the required keys
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current
> versions will LogAndApply this edit to ensure that these changes can take
> effect
> * IngestDb is used to directly ingest all sst files of one DB into another
> DB. But it is necessary to strictly ensure that the keys of the two DBs do
> not overlap, which is easy to do in the Flink scenario. The hard link method
> will be used in the process of ingesting files, so it will be very fast. At
> the same time, the file number of the main DB will be incremented
> sequentially, and the SequenceNumber of the main DB will be updated to the
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as
> follows
> * Open the first StateHandle as the main DB and pause the compaction.
> * Clip the main DB according to the KeyGroup range of the Task with ClipDB
> * Open other StateHandles in sequence as Tmp DB, and perform ClipDb
> according to the KeyGroup range
> * Ingest all tmpDb into the main Db after tmpDb cliped
> * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test
> results show that compared with the writeBatch method, the expansion and
> recovery speed of IngestDb can be increased by 5 to 10 times as follows
> (SstFileWriter means uses the recovery method of generating sst files through
> SstFileWriter in parallel)
> * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff0000}Iteration 1: 3.922 s/op{color}
> {color:#ff0000}Iteration 2: 3.208 s/op{color}
> {color:#ff0000}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff0000}Iteration 1: 6.207 s/op{color}
> {color:#ff0000}Iteration 2: 7.164 s/op{color}
> {color:#ff0000}Iteration 3: 6.397 s/op{color}|
> |5G|Iteration 1: 244.795 s/op
> Iteration 2: 243.141 s/op
> Iteration 3: 253.542 s/op|Iteration 1: 78.058 s/op
> Iteration 2: 85.635 s/op
> Iteration 3: 76.568 s/op|{color:#ff0000}Iteration 1: 23.397 s/op{color}
> {color:#ff0000}Iteration 2: 21.387 s/op{color}
> {color:#ff0000}Iteration 3: 22.858 s/op{color}|
> * parallelism changes from 4 to 8
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 3.477 s/op
> Iteration 2: 3.515 s/op
> Iteration 3: 3.433 s/op|Iteration 1: 3.453 s/op
> Iteration 2: 3.300 s/op
> Iteration 3: 3.313 s/op|{color:#ff0000}Iteration 1: 0.941 s/op{color}
> {color:#ff0000}Iteration 2: 0.963 s/op{color}
> {color:#ff0000}Iteration 3: 1.102 s/op{color}|
> |1G|IIteration 1: 7.571 s/op
> Iteration 2: 7.352 s/op
> Iteration 3: 7.568 s/op|Iteration 1: 5.032 s/op
> Iteration 2: 4.689 s/op
> Iteration 3: 6.883 s/op|{color:#ff0000}Iteration 1: 2.130 s/op{color}
> {color:#ff0000}Iteration 2: 2.110 s/op{color}
> {color:#ff0000}Iteration 3: 2.034 s/op{color}|
> |5G|Iteration 1: 91.870 s/op
> Iteration 2: 94.229 s/op
> Iteration 3: 93.271 s/op|Iteration 1: 25.845 s/op
> Iteration 2: 25.571 s/op
> Iteration 3: 25.685 s/op|{color:#ff0000}Iteration 1: 11.154 s/op{color}
> {color:#ff0000}Iteration 2: 10.732 s/op{color}
> {color:#ff0000}Iteration 3: 10.622 s/op{color}|
> * parallelism changes from 4 to 6
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.209 s/op
> Iteration 2: 9.893 s/op
> Iteration 3: 9.150 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff0000}Iteration 1: 2.622 s/op{color}
> {color:#ff0000}Iteration 2: 2.545 s/op{color}
> {color:#ff0000}Iteration 3: 2.573 s/op{color}|
> |1G|Iteration 1: 21.206 s/op
> Iteration 2: 26.214 s/op
> Iteration 3: 20.269 s/op|Iteration 1: 10.043 s/op
> Iteration 2: 10.744 s/op
> Iteration 3: 10.461 s/op|{color:#ff0000}Iteration 1: 4.400 s/op{color}
> {color:#ff0000}Iteration 2: 4.340 s/op{color}
> {color:#ff0000}Iteration 3: 6.234 s/op{color}|
> |5G|IIteration 1: 170.606 s/op
> Iteration 2: 160.576 s/op
> Iteration 3: 159.425 s/op|IIteration 1: 52.537 s/op
> Iteration 2: 50.576 s/op
> Iteration 3: 50.823 s/op|{color:#ff0000}Iteration 1: 19.053 s/op{color}
> {color:#ff0000}Iteration 2: 18.504 s/op{color}
> {color:#ff0000}Iteration 3: 18.249 s/op{color}|
> * parallelism changes from 4 to 3
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 6.330 s/op
> Iteration 2: 5.614 s/op
> Iteration 3: 5.736 s/op|Iteration 1: 4.083 s/op
> Iteration 2: 5.655 s/op
> Iteration 3: 3.998 s/op|{color:#ff0000}Iteration 1: 2.157 s/op{color}
> {color:#ff0000}Iteration 2: 2.201 s/op{color}
> {color:#ff0000}Iteration 3: 3.212 s/op{color}|
> |1G|Iteration 1: 13.814 s/op
> Iteration 2: 12.852 s/op
> Iteration 3: 13.480 s/op|Iteration 1: 9.619 s/op
> Iteration 2: 9.197 s/op
> Iteration 3: 8.694 s/op|{color:#ff0000}Iteration 1: 4.227 s/op{color}
> {color:#ff0000}Iteration 2: 4.234 s/op{color}
> {color:#ff0000}Iteration 3: 4.177 s/op{color}|
> |5G|Iteration 1: 136.621 s/op
> Iteration 2: 127.097 s/op
> Iteration 3: 139.694 s/op|Iteration 1: 39.612 s/op
> Iteration 2: 38.809 s/op
> Iteration 3: 39.125 s/op|{color:#ff0000}Iteration 1: 16.691 s/op{color}
> {color:#ff0000}Iteration 2: 16.599 s/op{color}
> {color:#ff0000}Iteration 3: 16.726 s/op{color}|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)