Jon Chase created FLINK-39136:
---------------------------------
Summary: GS file system fails to retry on certain 503's due to old
GCS lib dependency
Key: FLINK-39136
URL: https://issues.apache.org/jira/browse/FLINK-39136
Project: Flink
Issue Type: Bug
Components: Connectors / FileSystem
Affects Versions: 1.20.0, 1.19.0, 1.18.0
Reporter: Jon Chase
Fix For: 1.20.4
We're running Flink 1.20.3 with the GCS file sink and we frequently get the
following fatal error when GCS returns certain 503's:
{code:java}
java.io.IOException: Could not perform checkpoint 3568 for operator XXX -> XXX
-> ... -> FileSink: Writer -> FileSink: Committer (1142/1536)#4.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1394)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.google.cloud.storage.StorageException: Unknown Error
|> PUT
https://storage.googleapis.com/upload/storage/v1/b/xxx-xxx-xxxx/o?name=.inprogress/xx-xx-xx/x-x-x/x/xxxxxxx-13.gz/xxxx&uploadType=resumable&upload_id=XXXXXXXXX
|> content-range: bytes */*
|> x-goog-gcs-idempotency-token: 421d97a0-dfef-4c8b-bd62-ccf6cd9e3c12
|
|< HTTP/1.1 503 Service Unavailable
|< content-length: 0
|< content-type: text/plain; charset=utf-8
|< x-guploader-uploadid:
AJRbA5WFzXVVcI8389tnO3SPL6WT67GZxMPLyD8hP-TwoHROmMMIZjW1vC85ULIebPXWwTFWEzE6bBc7GeBruYeIj-4LPfgqvW5ohCw5JtixAMw
|
at
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:185)
at
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:117)
at
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:106)
at
com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:128)
at
com.google.cloud.storage.JsonResumableSession.query(JsonResumableSession.java:57)
at
com.google.cloud.storage.JsonResumableSession.lambda$put$0(JsonResumableSession.java:73)
at com.google.cloud.storage.Retrying.lambda$run$0(Retrying.java:102)
at
com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
at com.google.cloud.storage.Retrying.run(Retrying.java:99)
at
com.google.cloud.storage.JsonResumableSession.put(JsonResumableSession.java:68)
at
com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.internalWrite(ApiaryUnbufferedWritableByteChannel.java:114)
at
com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.writeAndClose(ApiaryUnbufferedWritableByteChannel.java:65)
at
com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel.writeAndClose(UnbufferedWritableByteChannelSession.java:40)
at
com.google.cloud.storage.DefaultBufferedWritableByteChannel.close(DefaultBufferedWritableByteChannel.java:167)
at
com.google.cloud.storage.StorageByteChannels$SynchronizedBufferedWritableByteChannel.close(StorageByteChannels.java:119)
at
com.google.cloud.storage.StorageException.wrapIOException(StorageException.java:184)
at
com.google.cloud.storage.BaseStorageWriteChannel.close(BaseStorageWriteChannel.java:84)
at
org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223)
at
org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99)
at
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240)
at
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.flush(GSRecoverableFsDataOutputStream.java:177)
at java.base/java.util.zip.DeflaterOutputStream.flush(Unknown Source)
at
org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream.flush(GzipCodec.java:107)
at
org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter.flush(HadoopCompressionBulkWriter.java:53)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:280)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:201)
at
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:221)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:332)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1437)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1425)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1382)
... 22 more
Caused by: java.lang.NullPointerException
at
com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:117)
... 56 more{code}
This NPE is thrown b/c of this bug in the GCS connector:
[https://github.com/googleapis/java-storage/pull/2987]
Upgrading to a newer release of the GCS library that contains the above fix
resolves the issue in the Flink sink and allows the sink to correctly retry
these 503's instead of hitting an NPE.
I'd like to apply this in the 1.20.4 release, since we are running a 1.20.x
version of Flink and don't plan to upgrade to 2.x anytime soon. That said, I'm
also happy to apply the fix to the 2.x branch, if someone can tell me the
appropriate branch to do it on.
I've manually built the GCS file system connector locally with an updated
version of the GCS library and validated that it addresses this issue.
{code:java}
Old:
<fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
<fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
New:
<fs.gs.sdk.version>2.62.0</fs.gs.sdk.version>
<fs.gs.grpc.version>1.76.2</fs.gs.grpc.version>{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)