[
https://issues.apache.org/jira/browse/FLINK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890860#comment-17890860
]
Gabor Somogyi edited comment on FLINK-36530 at 10/18/24 1:45 PM:
-----------------------------------------------------------------
[{{94c3b86}}|https://github.com/apache/flink/commit/94c3b86a368d545a0aa3ff6b5c42f6f8ec3e11de]
on master
[{{60cba35}}|https://github.com/apache/flink/commit/60cba350d7638592ea771dc7cf512798e6248886]
on release-1.20
[{{21f79d1}}|https://github.com/apache/flink/commit/21f79d1e0a6a8dbadff8cad1e7785610572b191f]
on release-1.19
[{{a38396f}}|https://github.com/apache/flink/commit/a38396fbceaa88992103b79feff71acd7b83e54b]
on release-1.18
was (Author: gaborgsomogyi):
[{{94c3b86}}|https://github.com/apache/flink/commit/94c3b86a368d545a0aa3ff6b5c42f6f8ec3e11de]
on master
> Not able to restore list state from S3
> --------------------------------------
>
> Key: FLINK-36530
> URL: https://issues.apache.org/jira/browse/FLINK-36530
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 2.0.0, 1.18.2, 1.20.0, 1.19.1
> Reporter: Gabor Somogyi
> Assignee: Gabor Somogyi
> Priority: Blocker
> Labels: pull-request-available
>
> FLINK-34063 has fixed an important issue with compacted state but introduced
> super slow state recovery for both non-compacted and compacted list states
> from S3.
> Short statement: ~6Mb list state generated from
> {code:java}
> org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator{code}
> restore time is ~62 hours.
> Detailed analysis:
> During file sink compaction CompactCoordinator with parallelism 1 is
> collecting the file list which needs to be compacted (and writes them into
> the state). In the problematic scenario the list list size was ~15k entries.
> OperatorStateRestoreOperation.deserializeOperatorStateValues gets an offset
> for each and every list entry and does basically the following:
> {code:java}
> for (long offset : offsets) {
> in.seek(offset);
> stateListForName.add(serializer.deserialize(div));
> }{code}
> CompressibleFSDataInputStream.seek has introduced the following code:
> {code:java}
> final int available = compressingDelegate.available();
> if (available > 0) {
> if (available != compressingDelegate.skip(available)) {
> throw new IOException("Unable to skip buffered data.");
> }
> }
> {code}
> There are 2 problems with the mentioned code part:
> * The skip operation is not needed for uncompressed state
> * skip takes ~15 seconds for ~6Mb in case of S3 (which ends up in ~62 hours
> restore time)
> We've already addressed the first issue with a simple if condition but the
> second is definitely a harder one. Until the latter is not resolved I would
> say that compressed state is not a good choice together with S3 and list
> restoral.
> Steps to reproduce:
> * Create a list operator state with several thousand entries
> * Put it to S3
> * Try to restore it from Flink
--
This message was sent by Atlassian Jira
(v8.20.10#820010)