[
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414828#comment-16414828
]
Jamie Grier edited comment on FLINK-9061 at 3/27/18 12:45 AM:
--------------------------------------------------------------
[[email protected]]
Here's the full stack trace:
{quote}"java.lang.Exception: Could not perform checkpoint 465 for operator
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569)
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 465 for operator
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560)
... 8 common frames omitted
Caused by: java.io.IOException: Could not flush and close the file system
output stream to
s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b
in order to obtain the stream state handle
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126)
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359)
... 13 common frames omitted
Caused by: java.io.IOException:
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
We encountered an internal error. Please try again. (Service: Amazon S3;
Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended
Request ID: BLAH_
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036)
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987)
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
... 18 common frames omitted
Caused by:
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
We encountered an internal error. Please try again. (Service: Amazon S3;
Status Code: 500; Error Code: InternalError; Request ID: BLAH)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:715)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:647)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511)
{quote}
was (Author: jgrier):
[[email protected]]
Here's the full stack trace:
"java.lang.Exception: Could not perform checkpoint 465 for operator
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:569)
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:380)
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:283)
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:185)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 465 for operator
catch_all.reduce -> (catch_all.late, catch_all.metric) (113\/256).
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:560)
... 8 common frames omitted
Caused by: java.io.IOException: Could not flush and close the file system
output stream to
s3://BUCKET/SERVICE/flink/checkpoints/fda9a54de951d8cf7a55b5cd833cb0f7/chk-465/02896cca-0d9c-4295-9d30-b0a3b7cc928b
in order to obtain the stream state handle
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:126)
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:359)
... 13 common frames omitted
Caused by: java.io.IOException:
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
We encountered an internal error. Please try again. (Service: Amazon S3;
Status Code: 500; Error Code: InternalError; Request ID: BLAH), S3 Extended
Request ID: BLAH_
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.uploadObject(PrestoS3FileSystem.java:1036)
org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:987)
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
... 18 common frames omitted
Caused by:
org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
We encountered an internal error. Please try again. (Service: Amazon S3;
Status Code: 500; Error Code: InternalError; Request ID: BLAH)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:715)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:647)
org.apache.flink.fs.s3presto.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511)
> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -----------------------------------------------------------------------------
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
> Issue Type: Bug
> Components: FileSystem, State Backends, Checkpointing
> Affects Versions: 1.4.2
> Reporter: Jamie Grier
> Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale
> jobs (those with many total tasks). The issue is that we are writing all the
> checkpoint data under a common key prefix. This is the worst case scenario
> for S3 performance since the key is used as a partition key.
>
> In the worst case checkpoints fail with a 500 status code coming back from S3
> and an internal error type of TooBusyException.
>
> One possible solution would be to add a hook in the Flink filesystem code
> that allows me to "rewrite" paths. For example say I have the checkpoint
> directory set to:
>
> s3://bucket/flink/checkpoints
>
> I would hook that and rewrite that path to:
>
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original
> path
>
> This would distribute the checkpoint write load around the S3 cluster evenly.
>
> For reference:
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>
> Any other people hit this issue? Any other ideas for solutions? This is a
> pretty serious problem for people trying to checkpoint to S3.
>
> -Jamie
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)