[
https://issues.apache.org/jira/browse/FLINK-36421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-36421:
-----------------------------------
Labels: pull-request-available (was: )
> Missing fsync in FsCheckpointStreamFactory
> ------------------------------------------
>
> Key: FLINK-36421
> URL: https://issues.apache.org/jira/browse/FLINK-36421
> Project: Flink
> Issue Type: Bug
> Components: FileSystems, Runtime / Checkpointing
> Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0
> Reporter: Marc Aurel Fritz
> Priority: Critical
> Labels: pull-request-available
> Attachments: fsync-fs-stream-factory.diff
>
>
> With Flink 1.20 we observed another checkpoint corruption bug. This is
> similar to FLINK-35217, but affects only files written by the taskmanager
> (the ones with random names as described
> [here|https://github.com/Planet-X/flink/blob/0d6e25a9738d9d4ee94de94e1437f92611b50758/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java#L47]).
> After system crash the files written by the taskmanager may be corrupted
> (file size of 0 bytes) if the changes in the file-system cache haven't been
> written to disk. The "_metadata" file written by the jobmanager is always
> fine because it's properly fsynced.
> Investigation revealed that "fsync" is missing, this time in
> "FsCheckpointStreamFactory". In this case the "OutputStream" is closed
> without calling "fsync", thus the file is not durably persisted on disk
> before the checkpoint is completed. (As previously established in
> FLINK-35217, calling "fsync" is necessary as simply closing the stream does
> not have any guarantees on persistence.)
> "strace" on the taskmanager's process confirms this behavior:
> # The checkpoint chk-1217's directory is created at "mkdir"
> # The checkpoint chk-1217's non-inline state is written by the taskmanager
> at "openat", filename is "0507881e-8877-40b0-82d6-3d7dead64ccc". Note that
> there's no "fsync" before "close".
> # The checkpoint chk-1217 is finished, its "_metadata" is written and synced
> properly
> # The old checkpoint chk-1216 is deleted at "unlink"
> The new checkpoint chk-1217 now references a not-synced file that can get
> corrupted on e.g. power loss. This means there is no working checkpoint left
> as the old checkpoint was deleted.
> For durable persistence an "fsync" call is missing before "close" in step 2.
> Full "strace" log:
> {code:java}
> [pid 947250] 08:22:58
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
> 0x7f68414c5b50) = -1 ENOENT (No such file or directory)
> [pid 947250] 08:22:58
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
> 0x7f68414c5b50) = -1 ENOENT (No such file or directory)
> [pid 947250] 08:22:58
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502",
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947250] 08:22:58
> mkdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
> 0777) = 0
> [pid 1303248] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
> 0x7f56f08d5610) = -1 ENOENT (No such file or directory)
> [pid 1303248] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 1303248] 08:22:59 openat(AT_FDCWD,
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
> O_WRONLY|O_CREAT|O_TRUNC, 0666) = 199
> [pid 1303248] 08:22:59 fstat(199, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
> [pid 1303248] 08:22:59 close(199) = 0
> [pid 947310] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
> 0x7f683fb378b0) = -1 ENOENT (No such file or directory)
> [pid 947310] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
> 0x7f683fb37730) = -1 ENOENT (No such file or directory)
> [pid 947310] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
> 0x7f683fb37730) = -1 ENOENT (No such file or directory)
> [pid 947310] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947310] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947310] 08:22:59 openat(AT_FDCWD,
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
> O_WRONLY|O_CREAT|O_EXCL, 0666) = 148
> [pid 947310] 08:22:59 fsync(148) = 0
> [pid 947310] 08:22:59 close(148) = 0
> [pid 947310] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
> {st_mode=S_IFREG|0644, st_size=46265, ...}) = 0
> [pid 947310] 08:22:59
> rename("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata")
> = 0
> [pid 947310] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe",
> <unfinished ...>
> [pid 947250] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata",
> <unfinished ...>
> [pid 947310] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=54409,
> ...}) = 0
> [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=46265,
> ...}) = 0
> [pid 947310] 08:22:59
> unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe"
> <unfinished ...>
> [pid 947250] 08:22:59
> unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata"
> <unfinished ...>
> [pid 947310] 08:22:59 <... unlink resumed>) = 0
> [pid 947250] 08:22:59 <... unlink resumed>) = 0
> [pid 947250] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947250] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
> {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
> [pid 947250] 08:22:59 openat(AT_FDCWD,
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 148
> [pid 947250] 08:22:59 newfstatat(148, "", {st_mode=S_IFDIR|0755,
> st_size=4096, ...}, AT_EMPTY_PATH) = 0
> [pid 947250] 08:22:59 close(148) = 0
> [pid 947250] 08:22:59
> stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
> <unfinished ...>
> [pid 947201] 08:22:59 <... stat resumed>0x7f56f2069a20) = -1 ENOENT (No such
> file or directory)
> [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFDIR|0755, st_size=4096,
> ...}) = 0
> [pid 947250] 08:22:59 openat(AT_FDCWD,
> "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY <unfinished ...>
> [pid 947250] 08:22:59 <... openat resumed>) = 148
> [pid 947250] 08:22:59 newfstatat(148, "", <unfinished ...>
> [pid 947250] 08:22:59 <... newfstatat resumed>{st_mode=S_IFDIR|0755,
> st_size=4096, ...}, AT_EMPTY_PATH) = 0
> [pid 947250] 08:22:59 close(148) = 0
> [pid 947250] 08:22:59
> unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") =
> -1 EISDIR (Is a directory)
> [pid 947250] 08:22:59
> rmdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 0
> {code}
> Calling "sync()" when closing the stream in
> "FsCheckpointStreamFactory::closeAndGetHandle" fixes the problem by syncing
> the serialized state files before returning their reference to the
> jobmanager. The following commit fixes this:
> [https://github.com/Planet-X/flink/commit/0d6e25a9738d9d4ee94de94e1437f92611b50758]
> Diff is also attached.
> "strace" confirms that "fsync" is now called before the taskmanager's state
> file is closed, see line 9:
> {code:java}
> [pid 108807] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
> 0x7f2c167fc890) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
> 0x7f2c167fc890) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc",
> {st_mode=S_IFDIR|0755, st_size=44, ...}) = 0
> [pid 108807] 13:14:59
> mkdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0777)
> = 0
> [pid 110456] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
> 0x7f7d56efbe90) = -1 ENOENT (No such file or directory)
> [pid 110456] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 110456] 13:14:59 openat(AT_FDCWD,
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
> O_WRONLY|O_CREAT|O_TRUNC, 0666) = 268
> [pid 110456] 13:14:59 fstat(268, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
> [pid 110456] 13:14:59 fsync(268) = 0
> [pid 110456] 13:14:59 close(268) = 0
> [pid 108807] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
> 0x7f2c167fc710) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
> <unfinished ...>
> [pid 108807] 13:14:59 <... stat resumed>0x7f2c167fc670) = -1 ENOENT (No such
> file or directory)
> [pid 108807] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
> 0x7f2c167fc670) = -1 ENOENT (No such file or directory)
> [pid 108807] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
> {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
> [pid 108807] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
> {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
> [pid 108807] 13:14:59 openat(AT_FDCWD,
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
> O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
> [pid 108807] 13:14:59 fsync(168) = 0
> [pid 108807] 13:14:59 close(168) = 0
> [pid 108807] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
> {st_mode=S_IFREG|0644, st_size=21416, ...}) = 0
> [pid 108807] 13:14:59
> rename("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata")
> = 0
> [pid 108823] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata",
> {st_mode=S_IFREG|0644, st_size=36684, ...}) = 0
> [pid 108823] 13:14:59
> unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata")
> = 0
> [pid 108823] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 108823] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 108823] 13:14:59 openat(AT_FDCWD,
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0,
> ...}, AT_EMPTY_PATH) = 0
> [pid 108823] 13:14:59 close(168) = 0
> [pid 108823] 13:14:59
> stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
> {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
> [pid 108823] 13:14:59 openat(AT_FDCWD,
> "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
> O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
> [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0,
> ...}, AT_EMPTY_PATH) = 0
> [pid 108823] 13:14:59 close(168) = 0
> [pid 108823] 13:14:59
> unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = -1
> EISDIR (Is a directory)
> [pid 108823] 13:14:59
> rmdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = 0
> {code}
> Presumably only checkpoints with larger state sizes are affected as small
> state is inlined into the "_metadata" file, which is properly persisted since
> flink 1.19.1 due to FLINK-35217.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)