[
https://issues.apache.org/jira/browse/FLINK-17988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118055#comment-17118055
]
Roman Khachatryan edited comment on FLINK-17988 at 5/27/20, 8:59 PM:
---------------------------------------------------------------------
>From thread stack traces follows, that fs.delete() on both filesystems causes
>object listing on s3:
{code:java}
at
com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:895)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.listPrefix(PrestoS3FileSystem.java:484)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:315)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:450)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:427)
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
at
org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:55)
at
org.apache.flink.runtime.state.StateUtil$$Lambda$430/787068135.accept(Unknown
Source)
at
org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
at
org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
at
org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
at
org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
at
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912)
{code}
{code:java}
at org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:1255)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2223)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
at
org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:60)
at
org.apache.flink.runtime.state.StateUtil$$Lambda$428/480418082.accept(Unknown
Source)
at
org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
at
org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
at
org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
at
org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
at
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912){code}
Unclear why aligned mode is less affected (it lists objects too).
Triggering of new requests is blocked on monitor which is held by
discardOnSubsume. This seems reasonable (otherwise checkpoints to discard would
pile up; and triggered checkpoint wouldn't be able to start anyways).
was (Author: roman_khachatryan):
>From thread stack traces follows, that fs.delete() on both filesystems causes
>object listing on s3:
{code:java}
at
com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:895)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.listPrefix(PrestoS3FileSystem.java:484)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:315)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:450)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:427)
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
at
org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:55)
at
org.apache.flink.runtime.state.StateUtil$$Lambda$430/787068135.accept(Unknown
Source)
at
org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
at
org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
at
org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
at
org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
at
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912)
{code}
{code:java}
at org.apache.hadoop.fs.s3a.S3AFileSystem.listObjects(S3AFileSystem.java:1255)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2223)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
at
org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.delete(HadoopFileSystem.java:147)
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:86)
at
org.apache.flink.runtime.state.AbstractChannelStateHandle.discardState(AbstractChannelStateHandle.java:60)
at
org.apache.flink.runtime.state.StateUtil$$Lambda$428/480418082.accept(Unknown
Source)
at
org.apache.flink.util.LambdaUtil.applyToAllWhileSuppressingExceptions(LambdaUtil.java:55)
at
org.apache.flink.runtime.state.StateUtil.bestEffortDiscardAllStateObjects(StateUtil.java:60)
at
org.apache.flink.runtime.checkpoint.OperatorSubtaskState.discardState(OperatorSubtaskState.java:236)
at
org.apache.flink.runtime.checkpoint.OperatorState.discardState(OperatorState.java:132)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:263)
at
org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:218)
at
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1005)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:912){code}
> Checkpointing slows down after reaching state.checkpoints.num-retained
> ----------------------------------------------------------------------
>
> Key: FLINK-17988
> URL: https://issues.apache.org/jira/browse/FLINK-17988
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.11.0
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Attachments: flink-conf.yaml, jobmanager.s3a.dmp, jobmanager.s3p.dmp
>
>
> With Unaligned checkpoints, happens always (new checkpoint is never started
> or triggered).
> With Aligned checkpoints - to some degree - depending on state size and
> thresholds: delayed by 1 minute. Delay grows very slowly with state size.
>
> Filesystems: s3p and s3a
> Parallelism: 176, repartition (num stages): 5.
> Number of files in checkpoint is about 1K (depends on
> state.backend.fs.memory-threshold and their size). Size doesn't matter
> (100K..10G).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)