Jon Chase created FLINK-39136:
---------------------------------

             Summary: GS file system fails to retry on certain 503's due to old 
GCS lib dependency
                 Key: FLINK-39136
                 URL: https://issues.apache.org/jira/browse/FLINK-39136
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 1.20.0, 1.19.0, 1.18.0
            Reporter: Jon Chase
             Fix For: 1.20.4


We're running Flink 1.20.3 with the GCS file sink and we frequently get the 
following fatal error when GCS returns certain 503's:

 
{code:java}
java.io.IOException: Could not perform checkpoint 3568 for operator XXX -> XXX 
-> ... -> FileSink: Writer -> FileSink: Committer (1142/1536)#4.
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1394)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
   at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
   at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
   at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
   at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
   at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
   at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.google.cloud.storage.StorageException: Unknown Error
   |> PUT 
https://storage.googleapis.com/upload/storage/v1/b/xxx-xxx-xxxx/o?name=.inprogress/xx-xx-xx/x-x-x/x/xxxxxxx-13.gz/xxxx&uploadType=resumable&upload_id=XXXXXXXXX
   |> content-range: bytes */*
   |> x-goog-gcs-idempotency-token: 421d97a0-dfef-4c8b-bd62-ccf6cd9e3c12
   | 
   |< HTTP/1.1 503 Service Unavailable
   |< content-length: 0
   |< content-type: text/plain; charset=utf-8
   |< x-guploader-uploadid: 
AJRbA5WFzXVVcI8389tnO3SPL6WT67GZxMPLyD8hP-TwoHROmMMIZjW1vC85ULIebPXWwTFWEzE6bBc7GeBruYeIj-4LPfgqvW5ohCw5JtixAMw
   | 
   at 
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:185)
   at 
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:117)
   at 
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:106)
   at 
com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:128)
   at 
com.google.cloud.storage.JsonResumableSession.query(JsonResumableSession.java:57)
   at 
com.google.cloud.storage.JsonResumableSession.lambda$put$0(JsonResumableSession.java:73)
   at com.google.cloud.storage.Retrying.lambda$run$0(Retrying.java:102)
   at 
com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
   at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
   at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
   at com.google.cloud.storage.Retrying.run(Retrying.java:99)
   at 
com.google.cloud.storage.JsonResumableSession.put(JsonResumableSession.java:68)
   at 
com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.internalWrite(ApiaryUnbufferedWritableByteChannel.java:114)
   at 
com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.writeAndClose(ApiaryUnbufferedWritableByteChannel.java:65)
   at 
com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel.writeAndClose(UnbufferedWritableByteChannelSession.java:40)
   at 
com.google.cloud.storage.DefaultBufferedWritableByteChannel.close(DefaultBufferedWritableByteChannel.java:167)
   at 
com.google.cloud.storage.StorageByteChannels$SynchronizedBufferedWritableByteChannel.close(StorageByteChannels.java:119)
   at 
com.google.cloud.storage.StorageException.wrapIOException(StorageException.java:184)
   at 
com.google.cloud.storage.BaseStorageWriteChannel.close(BaseStorageWriteChannel.java:84)
   at 
org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223)
   at 
org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99)
   at 
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240)
   at 
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.flush(GSRecoverableFsDataOutputStream.java:177)
   at java.base/java.util.zip.DeflaterOutputStream.flush(Unknown Source)
   at 
org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream.flush(GzipCodec.java:107)
   at 
org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter.flush(HadoopCompressionBulkWriter.java:53)
   at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
   at 
org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:280)
   at 
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:201)
   at 
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:221)
   at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
   at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
   at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
   at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:332)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1437)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1425)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1382)
   ... 22 more
Caused by: java.lang.NullPointerException
   at 
com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:117)
   ... 56 more{code}
 

 

This NPE is thrown b/c of this bug in the GCS connector: 
[https://github.com/googleapis/java-storage/pull/2987]

 

Upgrading to a newer release of the GCS library that contains the above fix 
resolves the issue in the Flink sink and allows the sink to correctly retry 
these 503's instead of hitting an NPE.


I'd like to apply this in the 1.20.4 release, since we are running a 1.20.x 
version of Flink and don't plan to upgrade to 2.x anytime soon.  That said, I'm 
also happy to apply the fix to the 2.x branch, if someone can tell me the 
appropriate branch to do it on.

 

I've manually built the GCS file system connector locally with an updated 
version of the GCS library and validated that it addresses this issue.

 
{code:java}
Old:
<fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
<fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
 
New:
<fs.gs.sdk.version>2.62.0</fs.gs.sdk.version>
<fs.gs.grpc.version>1.76.2</fs.gs.grpc.version>{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to