[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424625#comment-16424625
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 4/3/18 9:24 PM:
---------------------------------------------------------------

[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. We can make the change if this is desired. 
Practically, it probably doesn't make much difference in terms of spreading the 
load and throughput, because either way each operator got its own entropy prefix

Thanks,

Steven


was (Author: stevenz3wu):
[~jgrier] [~StephanEwen]

Here are our thinking. if you think it makes sense, we can submit a PR for 
checkpoint. As Stephan mentioned earlier, savepoint probably needs to be 
tackled separately.

1) new config to enable dynamic entropy injection
{code:java}
# user has full control on checkpoint path (including entropy key substitution)
# _ENTROPY_KEY_ can be at any part of the checkpoint path
state.backend.fs.checkpointdir: s3://bucket/_ENTROPY_KEY_/...

# boolean flag to enable entropy injection
state.backend.fs.checkpointdir.injectEntropy.enabled: true

# substring for entropy substitution 
state.backend.fs.checkpointdir.injectEntropy.key: _ENTROPY_KEY_

# optional: number of chars. default is 4
state.backend.fs.checkpointdir.injectEntropy.length: 4{code}
 

2) random v.s. hash: we are generating random hex chars for entropy. hash 
should work equally well. I am not strongly biased either way, even though I 
don't see much benefit of hash over random. deterministic hash doesn't seem to 
give much benefit

3) our current implementation does the entropy substitution during operator 
initialization. Conceptually, a better way is probably doing entropy 
substitution for each S3 write. Practically, it probably doesn't make much 
difference in terms of spreading the load and throughput, because either way 
each operator got its own entropy prefix

Thanks,

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-9061
>                 URL: https://issues.apache.org/jira/browse/FLINK-9061
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystem, State Backends, Checkpointing
>    Affects Versions: 1.4.2
>            Reporter: Jamie Grier
>            Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to