[
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694813#comment-17694813
]
Hangxiang Yu edited comment on FLINK-31238 at 3/1/23 2:17 AM:
--------------------------------------------------------------
Thank you for sharing the design. The duration of rescaling is a big concern,
when the state size of one parallelism becomes large. The idea in this design
is a good solution. In a nutshell, reuse the sst files stored in the latest
checkpoint.
However, I have some concerns about this design.
1. Does the IngestDb API rely on the ingestExternalFile() provided by RocksDB?
* If yes. The restriction on the ingestExternalFile() API makes the design
difficult to realize. I think the procedure of reusing the sst files needs to
ingest sst files from one RocksDB to another one. However, as far as I know,
RocksDB currently forbids this behavior. Though some people have been asking
for this feature and removing the restriction, e.g.,
[https://github.com/facebook/rocksdb/pull/5602], the RocksDB community does not
accept this feature right now.
* If no. I think the IngestDb API needs to manipulate the VersionSet of
RocksDB, thereby relying on the API of VersionSet. If RocksDB refactors the API
of VersionSet, the IngestDb API also needs to change accordingly. We need to
consider the maintainence cost in FRocksDB.
So I think it's better to push forward this feature in the RocksDB
community.
2. Both Downloading sst files and restoring DB are time-consuming, and the
optimization of RocksDB rescaling need to take both into consideration. Local
recovery is unavailable during rescaling recovery. From my experience, the
duration of downloading sst files is often higher than rebuilding DB.
especially when the bandwidth of users' remote DFS is limited. In this case,
it's better to also consider the duration of downloading sst files.
was (Author: masteryhx):
Thank you for sharing the design. The duration of rescaling is a big concern,
when the state size of one parallelism becomes large. The idea in this design
is a good solution. In a nutshell, reuse the sst files stored in the latest
checkpoint.
However, I have some concerns about this design.
# Does the IngestDb API rely on the ingestExternalFile() provided by RocksDB?
* If yes. The restriction on the ingestExternalFile() API makes the design
difficult to realize. I think the procedure of reusing the sst files needs to
ingest sst files from one RocksDB to another one. However, as far as I know,
RocksDB currently forbids this behavior. Though some people have been asking
for this feature and removing the restriction, e.g.,
[https://github.com/facebook/rocksdb/pull/5602], the RocksDB community does not
accept this feature right now.
* If no. I think the IngestDb API needs to manipulate the VersionSet of
RocksDB, thereby relying on the API of VersionSet. If RocksDB refactors the API
of VersionSet, the IngestDb API also needs to change accordingly. We need to
consider the maintainence cost in FRocksDB.
So I think it's better to push forward this feature in the RocksDB
community.
# Both Downloading sst files and restoring DB are time-consuming, and the
optimization of RocksDB rescaling need to take both into consideration. Local
recovery is unavailable during rescaling recovery. From my experience, the
duration of downloading sst files is often higher than rebuilding DB.
especially when the bandwidth of users' remote DFS is limited. In this case,
it's better to also consider the duration of downloading sst files.
> Use IngestDB to speed up Rocksdb rescaling recovery
> ----------------------------------------------------
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Affects Versions: 1.16.1
> Reporter: Yue Ma
> Priority: Major
> Attachments: image-2023-02-27-16-41-18-552.png,
> image-2023-02-27-16-57-18-435.png
>
>
> There have been many discussions and optimizations in the community about
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
> # Insert the valid keyGroup data of the new task.
> # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then
> insert data using writeBatch.In addition, the method of deleteRange is
> currently used to speed up the ClipDB. But in our production environment, we
> found that the speed of rescaling is still very slow, especially when the
> state of a single Task is large.
>
> We hope that the previous sst file can be reused directly when restoring
> state, instead of retraversing the data. So we made some attempts to optimize
> it in our internal version of flink and frocksdb.
>
> We added two APIs *ClipDb* and *IngestDb* in frocksdb.
> * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts
> beyond the key range. We will iterate over the FileMetaData of db. Process
> each sst file. There are three situations here.
> If all the keys of a file are required, we will keep the sst file and do
> nothing
> If all the keys of the sst file exceed the specified range, we will delete
> the file directly.
> If we only need some part of the sst file, we will rewrite the required keys
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current
> versions will LogAndApply this edit to ensure that these changes can take
> effect
> * IngestDb is used to directly ingest all sst files of one DB into another
> DB. But it is necessary to strictly ensure that the keys of the two DBs do
> not overlap, which is easy to do in the Flink scenario. The hard link method
> will be used in the process of ingesting files, so it will be very fast. At
> the same time, the file number of the main DB will be incremented
> sequentially, and the SequenceNumber of the main DB will be updated to the
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as
> follows
> * Open the first StateHandle as the main DB and pause the compaction.
> * Clip the main DB according to the KeyGroup range of the Task with ClipDB
> * Open other StateHandles in sequence as Tmp DB, and perform ClipDb
> according to the KeyGroup range
> * Ingest all tmpDb into the main Db after tmpDb cliped
> * Open the Compaction process of the main DB
> !image-2023-02-27-16-57-18-435.png|width=434,height=152!
> We have done some benchmark tests on the internal Flink version, and the test
> results show that compared with the writeBatch method, the expansion and
> recovery speed of IngestDb can be increased by 5 to 10 times as follows
> (SstFileWriter means uses the recovery method of generating sst files through
> SstFileWriter in parallel)
> * parallelism changes from 4 to 2
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.018 s/op
> Iteration 2: 9.551 s/op
> Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff0000}Iteration 1: 3.922 s/op{color}
> {color:#ff0000}Iteration 2: 3.208 s/op{color}
> {color:#ff0000}Iteration 3: 3.096 s/op{color}|
> |1G|Iteration 1: 19.686 s/op
> Iteration 2: 19.402 s/op
> Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
> Iteration 2: 16.933 s/op
> Iteration 3: 15.486 s/op|{color:#ff0000}Iteration 1: 6.207 s/op{color}
> {color:#ff0000}Iteration 2: 7.164 s/op{color}
> {color:#ff0000}Iteration 3: 6.397 s/op{color}|
> |5G|Iteration 1: 244.795 s/op
> Iteration 2: 243.141 s/op
> Iteration 3: 253.542 s/op|Iteration 1: 78.058 s/op
> Iteration 2: 85.635 s/op
> Iteration 3: 76.568 s/op|{color:#ff0000}Iteration 1: 23.397 s/op{color}
> {color:#ff0000}Iteration 2: 21.387 s/op{color}
> {color:#ff0000}Iteration 3: 22.858 s/op{color}|
> * parallelism changes from 4 to 8
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 3.477 s/op
> Iteration 2: 3.515 s/op
> Iteration 3: 3.433 s/op|Iteration 1: 3.453 s/op
> Iteration 2: 3.300 s/op
> Iteration 3: 3.313 s/op|{color:#ff0000}Iteration 1: 0.941 s/op{color}
> {color:#ff0000}Iteration 2: 0.963 s/op{color}
> {color:#ff0000}Iteration 3: 1.102 s/op{color}|
> |1G|IIteration 1: 7.571 s/op
> Iteration 2: 7.352 s/op
> Iteration 3: 7.568 s/op|Iteration 1: 5.032 s/op
> Iteration 2: 4.689 s/op
> Iteration 3: 6.883 s/op|{color:#ff0000}Iteration 1: 2.130 s/op{color}
> {color:#ff0000}Iteration 2: 2.110 s/op{color}
> {color:#ff0000}Iteration 3: 2.034 s/op{color}|
> |5G|Iteration 1: 91.870 s/op
> Iteration 2: 94.229 s/op
> Iteration 3: 93.271 s/op|Iteration 1: 25.845 s/op
> Iteration 2: 25.571 s/op
> Iteration 3: 25.685 s/op|{color:#ff0000}Iteration 1: 11.154 s/op{color}
> {color:#ff0000}Iteration 2: 10.732 s/op{color}
> {color:#ff0000}Iteration 3: 10.622 s/op{color}|
> * parallelism changes from 4 to 6
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 8.209 s/op
> Iteration 2: 9.893 s/op
> Iteration 3: 9.150 s/op|Iteration 1: 6.041 s/op
> Iteration 2: 5.934 s/op
> Iteration 3: 6.707 s/o|{color:#ff0000}Iteration 1: 2.622 s/op{color}
> {color:#ff0000}Iteration 2: 2.545 s/op{color}
> {color:#ff0000}Iteration 3: 2.573 s/op{color}|
> |1G|Iteration 1: 21.206 s/op
> Iteration 2: 26.214 s/op
> Iteration 3: 20.269 s/op|Iteration 1: 10.043 s/op
> Iteration 2: 10.744 s/op
> Iteration 3: 10.461 s/op|{color:#ff0000}Iteration 1: 4.400 s/op{color}
> {color:#ff0000}Iteration 2: 4.340 s/op{color}
> {color:#ff0000}Iteration 3: 6.234 s/op{color}|
> |5G|IIteration 1: 170.606 s/op
> Iteration 2: 160.576 s/op
> Iteration 3: 159.425 s/op|IIteration 1: 52.537 s/op
> Iteration 2: 50.576 s/op
> Iteration 3: 50.823 s/op|{color:#ff0000}Iteration 1: 19.053 s/op{color}
> {color:#ff0000}Iteration 2: 18.504 s/op{color}
> {color:#ff0000}Iteration 3: 18.249 s/op{color}|
> * parallelism changes from 4 to 3
> |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
> |500M|Iteration 1: 6.330 s/op
> Iteration 2: 5.614 s/op
> Iteration 3: 5.736 s/op|Iteration 1: 4.083 s/op
> Iteration 2: 5.655 s/op
> Iteration 3: 3.998 s/op|{color:#ff0000}Iteration 1: 2.157 s/op{color}
> {color:#ff0000}Iteration 2: 2.201 s/op{color}
> {color:#ff0000}Iteration 3: 3.212 s/op{color}|
> |1G|Iteration 1: 13.814 s/op
> Iteration 2: 12.852 s/op
> Iteration 3: 13.480 s/op|Iteration 1: 9.619 s/op
> Iteration 2: 9.197 s/op
> Iteration 3: 8.694 s/op|{color:#ff0000}Iteration 1: 4.227 s/op{color}
> {color:#ff0000}Iteration 2: 4.234 s/op{color}
> {color:#ff0000}Iteration 3: 4.177 s/op{color}|
> |5G|Iteration 1: 136.621 s/op
> Iteration 2: 127.097 s/op
> Iteration 3: 139.694 s/op|Iteration 1: 39.612 s/op
> Iteration 2: 38.809 s/op
> Iteration 3: 39.125 s/op|{color:#ff0000}Iteration 1: 16.691 s/op{color}
> {color:#ff0000}Iteration 2: 16.599 s/op{color}
> {color:#ff0000}Iteration 3: 16.726 s/op{color}|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)