Xi Zhang created FLINK-38225:
--------------------------------

             Summary: GSBlobStorageImpl needs to add precondition check to make 
503 retryable
                 Key: FLINK-38225
                 URL: https://issues.apache.org/jira/browse/FLINK-38225
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / FileSystem
    Affects Versions: 1.20.0, 1.19.0, 1.18.0, 2.0.1
            Reporter: Xi Zhang


We discovered that storage.writer and compose are not retrying 503 errors. It 
happens quite frequently and causes checkpoint failures. 

Here is our investigation findings:
 * We noticed frequent checkpoint failures due to GCS 503, tuning retry 
settings didn't seem to take an effect.
 * We upgraded storage client to 2.52.1 and built a custom image. The new 
version revealed more detailed error: 
 ** Suppressed: 
com.google.cloud.storage.RetryContext$RetryBudgetExhaustedComment: Unretryable 
error (attempts: 1, maxAttempts: 12, elapsed: PT0.683S, nextBackoff: 
PT1.173526768S, timeout: PT2M30S)

Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 
503 Service Unavailable

 * Upon checking with Google, we need to set preconditions in write and 
compose, otherwise 503 error will be deemed not retryable.
 * The fix is as below

{code:java}
Storage.BlobTargetOption precondition;
if (storage.get(targetBlobIdentifier.bucketName, 
targetBlobIdentifier.objectName) == null) {
    // For a target object that does not yet exist, set the DoesNotExist 
precondition.
    // This will cause the request to fail if the object is created before the 
request runs.
    precondition = Storage.BlobTargetOption.doesNotExist();
} else {
    // If the destination already exists in your bucket, instead set a 
generation-match
    // precondition. This will cause the request to fail if the existing 
object's generation
    // changes before the request runs.
    precondition =
            Storage.BlobTargetOption.generationMatch(
                    storage.get(
                                    targetBlobIdentifier.bucketName,
                                    targetBlobIdentifier.objectName)
                            .getGeneration());
}
Storage.ComposeRequest request = builder.setTargetOptions(precondition).build();

storage.compose(request);
 {code}
It needs to be added in compose: 
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java#L157]
 * Similiar fix applies to write as well: 
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorageImpl.java#L64]
 * We don't see checkpoint failures after the fix. 
 * We'd like to apply the fix to open source code, since we don't want our 
image to diverge too much with the open source version. I can help contribute 
if needed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to