[
https://issues.apache.org/jira/browse/FLINK-36190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexis Sarda-Espinosa updated FLINK-36190:
------------------------------------------
Description:
I have a job that uses RocksDB with incremental checkpointing, where the target
is Azure Blob File System. Due to network issues, the Hadoop stack is having
problems storing checkpoint files. This, of course, is not a problem in Flink
itself, but I have encountered a scenario that apparently left Flink's handles
inconsistent.
First, I can see the following logs in the Task Manager:
{noformat}
2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25]
org.apache.hadoop.fs.azurebfs.services.AbfsClient:311 - HttpRequestFailure:
0,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90,
java.net.SocketException: Connection reset
2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25]
org.apache.hadoop.fs.azurebfs.services.AbfsClient:221 - Retrying REST operation
CreatePath. RetryCount = 1
2024-08-30 17:53:20,493 DEBUG [AsyncOperations-thread-25]
org.apache.hadoop.fs.azurebfs.services.AbfsClient:323 - HttpRequest:
CreatePath:
409,PathAlreadyExists,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90{noformat}
As far as I can tell based on the Hadoop's source code, this
{{PathAlreadyExists}} error will not be retried nor handled by them, so I guess
it must be handled somewhere in Flink's stack. The issue is that the Job
Manager later logs this:
{noformat}
2024-08-30 17:54:23,105 WARN [cluster-io-thread-8]
org.apache.flink.runtime.checkpoint.CheckpointsCleaner:141 - Could not properly
discard completed checkpoint
148.org.apache.hadoop.fs.FileAlreadyExistsException: Operation failed: "The
recursive query parameter value must be true to delete a non-empty directory.",
409, DELETE,
https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false,
DirectoryNotEmpty, "The recursive query parameter value must be true to delete
a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000
Time:2024-08-30T17:54:23.0931772Z" at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1380)
~[?:?] at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:473)
~[?:?] at
org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:160)
~[?:?] at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint$CompletedCheckpointDiscardObject.discard(CompletedCheckpoint.java:358)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$1(CheckpointsCleaner.java:139)
~[flink-dist-1.18.1.jar:1.18.1] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source) [?:?] at java.base/java.lang.Thread.run(Unknown Source) [?:?]Caused
by:
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException:
Operation failed: "The recursive query parameter value must be true to delete a
non-empty directory.", 409, DELETE,
https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false,
DirectoryNotEmpty, "The recursive query parameter value must be true to delete
a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000
Time:2024-08-30T17:54:23.0931772Z" at
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:231)
~[?:?] at
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:191)
~[?:?] at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)
~[?:?] at
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:189)
~[?:?] at
org.apache.hadoop.fs.azurebfs.services.AbfsClient.deletePath(AbfsClient.java:742)
~[?:?] at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.delete(AzureBlobFileSystemStore.java:915)
~[?:?] at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:470)
~[?:?] ... 8 more{noformat}
And indeed, if I look at that path in the blob storage, I can see a the
6fa8b251-bb61-4a19-b5ae-93c1a885b5a7 file is there, which prevents the deletion
of the folder.
I also find it odd that the file's size is 0 bytes, yet the checkpoint was
marked as completed. Is it normal that some checkpoint files are empty
sometimes?
Side note: there are multiple Hadoop classes using the same logger, the logs
from the first block above are actually from
{noformat}
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation{noformat}
was:
I have a job that uses RocksDB with incremental checkpointing, where the target
is Azure Blob File System. Due to network issues, the Hadoop stack is having
problems storing checkpoint files. This, of course, is not a problem in Flink
itself, but I have encountered a scenario that apparently left Flink's handles
inconsistent.
First, I can see the following logs in the Task Manager:
{noformat}
2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25]
org.apache.hadoop.fs.azurebfs.services.AbfsClient:311 - HttpRequestFailure:
0,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90,
java.net.SocketException: Connection reset
2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25]
org.apache.hadoop.fs.azurebfs.services.AbfsClient:221 - Retrying REST operation
CreatePath. RetryCount = 1
2024-08-30 17:53:20,493 DEBUG [AsyncOperations-thread-25]
org.apache.hadoop.fs.azurebfs.services.AbfsClient:323 - HttpRequest:
CreatePath:
409,PathAlreadyExists,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90{noformat}
As far as I can tell based on the Hadoop's source code, this
{{PathAlreadyExists}} error will not be retried nor handled by them, so I guess
it must be handled somewhere in Flink's stack. The issue is that the Job
Manager later logs this:
{noformat}
2024-08-30 17:54:23,105 WARN [cluster-io-thread-8]
org.apache.flink.runtime.checkpoint.CheckpointsCleaner:141 - Could not properly
discard completed checkpoint
148.org.apache.hadoop.fs.FileAlreadyExistsException: Operation failed: "The
recursive query parameter value must be true to delete a non-empty directory.",
409, DELETE,
https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false,
DirectoryNotEmpty, "The recursive query parameter value must be true to delete
a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000
Time:2024-08-30T17:54:23.0931772Z" at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1380)
~[?:?] at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:473)
~[?:?] at
org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:160)
~[?:?] at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint$CompletedCheckpointDiscardObject.discard(CompletedCheckpoint.java:358)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$1(CheckpointsCleaner.java:139)
~[flink-dist-1.18.1.jar:1.18.1] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
[?:?] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
[?:?] at java.base/java.lang.Thread.run(Unknown Source) [?:?]Caused by:
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException:
Operation failed: "The recursive query parameter value must be true to delete a
non-empty directory.", 409, DELETE,
https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false,
DirectoryNotEmpty, "The recursive query parameter value must be true to delete
a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000
Time:2024-08-30T17:54:23.0931772Z" at
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:231)
~[?:?] at
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:191)
~[?:?] at
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)
~[?:?] at
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:189)
~[?:?] at
org.apache.hadoop.fs.azurebfs.services.AbfsClient.deletePath(AbfsClient.java:742)
~[?:?] at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.delete(AzureBlobFileSystemStore.java:915)
~[?:?] at
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:470)
~[?:?] ... 8 more{noformat}
And indeed, if I look at that path in the blob storage, I can see a the
6fa8b251-bb61-4a19-b5ae-93c1a885b5a7 file is there, which prevents the deletion
of the folder.
I also find it odd that the file's size is 0 bytes, yet the checkpoint was
marked as completed. Is it normal that some checkpoint files are empty
sometimes?
Side note: there are multiple Hadoop classes using the same logger, the logs
from the first block above are actually from
{noformat}
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation{noformat}
> Completed checkpoint's file cannot be discarded due to network issues
> ---------------------------------------------------------------------
>
> Key: FLINK-36190
> URL: https://issues.apache.org/jira/browse/FLINK-36190
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.18.1
> Reporter: Alexis Sarda-Espinosa
> Priority: Minor
>
> I have a job that uses RocksDB with incremental checkpointing, where the
> target is Azure Blob File System. Due to network issues, the Hadoop stack is
> having problems storing checkpoint files. This, of course, is not a problem
> in Flink itself, but I have encountered a scenario that apparently left
> Flink's handles inconsistent.
> First, I can see the following logs in the Task Manager:
> {noformat}
> 2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25]
> org.apache.hadoop.fs.azurebfs.services.AbfsClient:311 - HttpRequestFailure:
> 0,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90,
> java.net.SocketException: Connection reset
> 2024-08-30 17:53:14,952 DEBUG [AsyncOperations-thread-25]
> org.apache.hadoop.fs.azurebfs.services.AbfsClient:221 - Retrying REST
> operation CreatePath. RetryCount = 1
> 2024-08-30 17:53:20,493 DEBUG [AsyncOperations-thread-25]
> org.apache.hadoop.fs.azurebfs.services.AbfsClient:323 - HttpRequest:
> CreatePath:
> 409,PathAlreadyExists,...,PUT,https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148/6fa8b251-bb61-4a19-b5ae-93c1a885b5a7?resource=file&timeout=90{noformat}
> As far as I can tell based on the Hadoop's source code, this
> {{PathAlreadyExists}} error will not be retried nor handled by them, so I
> guess it must be handled somewhere in Flink's stack. The issue is that the
> Job Manager later logs this:
> {noformat}
> 2024-08-30 17:54:23,105 WARN [cluster-io-thread-8]
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner:141 - Could not
> properly discard completed checkpoint
> 148.org.apache.hadoop.fs.FileAlreadyExistsException: Operation failed: "The
> recursive query parameter value must be true to delete a non-empty
> directory.", 409, DELETE,
> https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false,
> DirectoryNotEmpty, "The recursive query parameter value must be true to
> delete a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000
> Time:2024-08-30T17:54:23.0931772Z" at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1380)
> ~[?:?] at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:473)
> ~[?:?] at
> org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:160)
> ~[?:?] at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:74)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint$CompletedCheckpointDiscardObject.discard(CompletedCheckpoint.java:358)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$1(CheckpointsCleaner.java:139)
> ~[flink-dist-1.18.1.jar:1.18.1] at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> [?:?] at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> [?:?] at java.base/java.lang.Thread.run(Unknown Source) [?:?]Caused by:
> org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException:
> Operation failed: "The recursive query parameter value must be true to
> delete a non-empty directory.", 409, DELETE,
> https://.../checkpoints/035f85c3de3190faa0b129a2def22822/chk-148?timeout=90&recursive=false,
> DirectoryNotEmpty, "The recursive query parameter value must be true to
> delete a non-empty directory. RequestId:4767d0f8-c01f-001d-3905-fba443000000
> Time:2024-08-30T17:54:23.0931772Z" at
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:231)
> ~[?:?] at
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:191)
> ~[?:?] at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:464)
> ~[?:?] at
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:189)
> ~[?:?] at
> org.apache.hadoop.fs.azurebfs.services.AbfsClient.deletePath(AbfsClient.java:742)
> ~[?:?] at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.delete(AzureBlobFileSystemStore.java:915)
> ~[?:?] at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.delete(AzureBlobFileSystem.java:470)
> ~[?:?] ... 8 more{noformat}
> And indeed, if I look at that path in the blob storage, I can see a the
> 6fa8b251-bb61-4a19-b5ae-93c1a885b5a7 file is there, which prevents the
> deletion of the folder.
> I also find it odd that the file's size is 0 bytes, yet the checkpoint was
> marked as completed. Is it normal that some checkpoint files are empty
> sometimes?
> Side note: there are multiple Hadoop classes using the same logger, the logs
> from the first block above are actually from
> {noformat}
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)