[
https://issues.apache.org/jira/browse/FLINK-39136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18085042#comment-18085042
]
Yunfeng Zhou commented on FLINK-39136:
--------------------------------------
I am preparing the 1.20.5 patch release. There seems not to have been a
corresponding PR for the release-1.20 branch, and the PR for master branch has
not been active in 90 days, so I'll move it to the next patch release.
Feel free to reach out if you fix it.
> GS file system fails to retry on certain 503's due to old GCS lib dependency
> ----------------------------------------------------------------------------
>
> Key: FLINK-39136
> URL: https://issues.apache.org/jira/browse/FLINK-39136
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.18.0, 1.19.0, 1.20.0
> Reporter: Jon Chase
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.20.5
>
>
> We're running Flink 1.20.3 with the GCS file sink and we frequently get the
> following fatal error when GCS returns certain 503's:
>
> {code:java}
> java.io.IOException: Could not perform checkpoint 3568 for operator XXX ->
> XXX -> ... -> FileSink: Writer -> FileSink: Committer (1142/1536)#4.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1394)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: com.google.cloud.storage.StorageException: Unknown Error
> |> PUT
> https://storage.googleapis.com/upload/storage/v1/b/xxx-xxx-xxxx/o?name=.inprogress/xx-xx-xx/x-x-x/x/xxxxxxx-13.gz/xxxx&uploadType=resumable&upload_id=XXXXXXXXX
> |> content-range: bytes */*
> |> x-goog-gcs-idempotency-token: 421d97a0-dfef-4c8b-bd62-ccf6cd9e3c12
> |
> |< HTTP/1.1 503 Service Unavailable
> |< content-length: 0
> |< content-type: text/plain; charset=utf-8
> |< x-guploader-uploadid:
> AJRbA5WFzXVVcI8389tnO3SPL6WT67GZxMPLyD8hP-TwoHROmMMIZjW1vC85ULIebPXWwTFWEzE6bBc7GeBruYeIj-4LPfgqvW5ohCw5JtixAMw
> |
> at
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:185)
> at
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:117)
> at
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:106)
> at
> com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:128)
> at
> com.google.cloud.storage.JsonResumableSession.query(JsonResumableSession.java:57)
> at
> com.google.cloud.storage.JsonResumableSession.lambda$put$0(JsonResumableSession.java:73)
> at com.google.cloud.storage.Retrying.lambda$run$0(Retrying.java:102)
> at
> com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
> at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
> at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
> at com.google.cloud.storage.Retrying.run(Retrying.java:99)
> at
> com.google.cloud.storage.JsonResumableSession.put(JsonResumableSession.java:68)
> at
> com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.internalWrite(ApiaryUnbufferedWritableByteChannel.java:114)
> at
> com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.writeAndClose(ApiaryUnbufferedWritableByteChannel.java:65)
> at
> com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel.writeAndClose(UnbufferedWritableByteChannelSession.java:40)
> at
> com.google.cloud.storage.DefaultBufferedWritableByteChannel.close(DefaultBufferedWritableByteChannel.java:167)
> at
> com.google.cloud.storage.StorageByteChannels$SynchronizedBufferedWritableByteChannel.close(StorageByteChannels.java:119)
> at
> com.google.cloud.storage.StorageException.wrapIOException(StorageException.java:184)
> at
> com.google.cloud.storage.BaseStorageWriteChannel.close(BaseStorageWriteChannel.java:84)
> at
> org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223)
> at
> org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99)
> at
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240)
> at
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.flush(GSRecoverableFsDataOutputStream.java:177)
> at java.base/java.util.zip.DeflaterOutputStream.flush(Unknown Source)
> at
> org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream.flush(GzipCodec.java:107)
> at
> org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter.flush(HadoopCompressionBulkWriter.java:53)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
> at
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:280)
> at
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:201)
> at
> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:221)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:332)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1437)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1425)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1382)
> ... 22 more
> Caused by: java.lang.NullPointerException
> at
> com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:117)
> ... 56 more{code}
>
>
> This NPE is thrown b/c of this bug in the GCS connector:
> [https://github.com/googleapis/java-storage/pull/2987]
>
> Upgrading to a newer release of the GCS library that contains the above fix
> resolves the issue in the Flink sink and allows the sink to correctly retry
> these 503's instead of hitting an NPE.
> I'd like to apply this in the 1.20.4 release, since we are running a 1.20.x
> version of Flink and don't plan to upgrade to 2.x anytime soon. That said,
> I'm also happy to apply the fix to the 2.x branch, if someone can tell me the
> appropriate branch to do it on.
>
> I've manually built the GCS file system connector locally with an updated
> version of the GCS library and validated that it addresses this issue.
>
> {code:java}
> Old:
> <fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
> <fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
>
> New:
> <fs.gs.sdk.version>2.62.0</fs.gs.sdk.version>
> <fs.gs.grpc.version>1.76.2</fs.gs.grpc.version>{code}
>
>
> I'm happy to open PR(s) for this once there's alignment on the ticket itself!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)