[
https://issues.apache.org/jira/browse/FLINK-39136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jon Chase updated FLINK-39136:
------------------------------
Description:
We're running Flink 1.20.3 with the GCS file sink and we frequently get the
following fatal error when GCS returns certain 503's:
{code:java}
java.io.IOException: Could not perform checkpoint 3568 for operator XXX -> XXX
-> ... -> FileSink: Writer -> FileSink: Committer (1142/1536)#4.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1394)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.google.cloud.storage.StorageException: Unknown Error
|> PUT
https://storage.googleapis.com/upload/storage/v1/b/xxx-xxx-xxxx/o?name=.inprogress/xx-xx-xx/x-x-x/x/xxxxxxx-13.gz/xxxx&uploadType=resumable&upload_id=XXXXXXXXX
|> content-range: bytes */*
|> x-goog-gcs-idempotency-token: 421d97a0-dfef-4c8b-bd62-ccf6cd9e3c12
|
|< HTTP/1.1 503 Service Unavailable
|< content-length: 0
|< content-type: text/plain; charset=utf-8
|< x-guploader-uploadid:
AJRbA5WFzXVVcI8389tnO3SPL6WT67GZxMPLyD8hP-TwoHROmMMIZjW1vC85ULIebPXWwTFWEzE6bBc7GeBruYeIj-4LPfgqvW5ohCw5JtixAMw
|
at
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:185)
at
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:117)
at
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:106)
at
com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:128)
at
com.google.cloud.storage.JsonResumableSession.query(JsonResumableSession.java:57)
at
com.google.cloud.storage.JsonResumableSession.lambda$put$0(JsonResumableSession.java:73)
at com.google.cloud.storage.Retrying.lambda$run$0(Retrying.java:102)
at
com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
at com.google.cloud.storage.Retrying.run(Retrying.java:99)
at
com.google.cloud.storage.JsonResumableSession.put(JsonResumableSession.java:68)
at
com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.internalWrite(ApiaryUnbufferedWritableByteChannel.java:114)
at
com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.writeAndClose(ApiaryUnbufferedWritableByteChannel.java:65)
at
com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel.writeAndClose(UnbufferedWritableByteChannelSession.java:40)
at
com.google.cloud.storage.DefaultBufferedWritableByteChannel.close(DefaultBufferedWritableByteChannel.java:167)
at
com.google.cloud.storage.StorageByteChannels$SynchronizedBufferedWritableByteChannel.close(StorageByteChannels.java:119)
at
com.google.cloud.storage.StorageException.wrapIOException(StorageException.java:184)
at
com.google.cloud.storage.BaseStorageWriteChannel.close(BaseStorageWriteChannel.java:84)
at
org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223)
at
org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99)
at
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240)
at
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.flush(GSRecoverableFsDataOutputStream.java:177)
at java.base/java.util.zip.DeflaterOutputStream.flush(Unknown Source)
at
org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream.flush(GzipCodec.java:107)
at
org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter.flush(HadoopCompressionBulkWriter.java:53)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:280)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:201)
at
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:221)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:332)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1437)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1425)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1382)
... 22 more
Caused by: java.lang.NullPointerException
at
com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:117)
... 56 more{code}
This NPE is thrown b/c of this bug in the GCS connector:
[https://github.com/googleapis/java-storage/pull/2987]
Upgrading to a newer release of the GCS library that contains the above fix
resolves the issue in the Flink sink and allows the sink to correctly retry
these 503's instead of hitting an NPE.
I'd like to apply this in the 1.20.4 release, since we are running a 1.20.x
version of Flink and don't plan to upgrade to 2.x anytime soon. That said, I'm
also happy to apply the fix to the 2.x branch, if someone can tell me the
appropriate branch to do it on.
I've manually built the GCS file system connector locally with an updated
version of the GCS library and validated that it addresses this issue.
{code:java}
Old:
<fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
<fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
New:
<fs.gs.sdk.version>2.62.0</fs.gs.sdk.version>
<fs.gs.grpc.version>1.76.2</fs.gs.grpc.version>{code}
I'm happy to open PR(s) for this once there's alignment on the ticket itself!
was:
We're running Flink 1.20.3 with the GCS file sink and we frequently get the
following fatal error when GCS returns certain 503's:
{code:java}
java.io.IOException: Could not perform checkpoint 3568 for operator XXX -> XXX
-> ... -> FileSink: Writer -> FileSink: Committer (1142/1536)#4.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1394)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.google.cloud.storage.StorageException: Unknown Error
|> PUT
https://storage.googleapis.com/upload/storage/v1/b/xxx-xxx-xxxx/o?name=.inprogress/xx-xx-xx/x-x-x/x/xxxxxxx-13.gz/xxxx&uploadType=resumable&upload_id=XXXXXXXXX
|> content-range: bytes */*
|> x-goog-gcs-idempotency-token: 421d97a0-dfef-4c8b-bd62-ccf6cd9e3c12
|
|< HTTP/1.1 503 Service Unavailable
|< content-length: 0
|< content-type: text/plain; charset=utf-8
|< x-guploader-uploadid:
AJRbA5WFzXVVcI8389tnO3SPL6WT67GZxMPLyD8hP-TwoHROmMMIZjW1vC85ULIebPXWwTFWEzE6bBc7GeBruYeIj-4LPfgqvW5ohCw5JtixAMw
|
at
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:185)
at
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:117)
at
com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:106)
at
com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:128)
at
com.google.cloud.storage.JsonResumableSession.query(JsonResumableSession.java:57)
at
com.google.cloud.storage.JsonResumableSession.lambda$put$0(JsonResumableSession.java:73)
at com.google.cloud.storage.Retrying.lambda$run$0(Retrying.java:102)
at
com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
at com.google.cloud.storage.Retrying.run(Retrying.java:99)
at
com.google.cloud.storage.JsonResumableSession.put(JsonResumableSession.java:68)
at
com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.internalWrite(ApiaryUnbufferedWritableByteChannel.java:114)
at
com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.writeAndClose(ApiaryUnbufferedWritableByteChannel.java:65)
at
com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel.writeAndClose(UnbufferedWritableByteChannelSession.java:40)
at
com.google.cloud.storage.DefaultBufferedWritableByteChannel.close(DefaultBufferedWritableByteChannel.java:167)
at
com.google.cloud.storage.StorageByteChannels$SynchronizedBufferedWritableByteChannel.close(StorageByteChannels.java:119)
at
com.google.cloud.storage.StorageException.wrapIOException(StorageException.java:184)
at
com.google.cloud.storage.BaseStorageWriteChannel.close(BaseStorageWriteChannel.java:84)
at
org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223)
at
org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99)
at
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240)
at
org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.flush(GSRecoverableFsDataOutputStream.java:177)
at java.base/java.util.zip.DeflaterOutputStream.flush(Unknown Source)
at
org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream.flush(GzipCodec.java:107)
at
org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter.flush(HadoopCompressionBulkWriter.java:53)
at
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:280)
at
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:201)
at
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:221)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:332)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1437)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1425)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1382)
... 22 more
Caused by: java.lang.NullPointerException
at
com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:117)
... 56 more{code}
This NPE is thrown b/c of this bug in the GCS connector:
[https://github.com/googleapis/java-storage/pull/2987]
Upgrading to a newer release of the GCS library that contains the above fix
resolves the issue in the Flink sink and allows the sink to correctly retry
these 503's instead of hitting an NPE.
I'd like to apply this in the 1.20.4 release, since we are running a 1.20.x
version of Flink and don't plan to upgrade to 2.x anytime soon. That said, I'm
also happy to apply the fix to the 2.x branch, if someone can tell me the
appropriate branch to do it on.
I've manually built the GCS file system connector locally with an updated
version of the GCS library and validated that it addresses this issue.
{code:java}
Old:
<fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
<fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
New:
<fs.gs.sdk.version>2.62.0</fs.gs.sdk.version>
<fs.gs.grpc.version>1.76.2</fs.gs.grpc.version>{code}
> GS file system fails to retry on certain 503's due to old GCS lib dependency
> ----------------------------------------------------------------------------
>
> Key: FLINK-39136
> URL: https://issues.apache.org/jira/browse/FLINK-39136
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.18.0, 1.19.0, 1.20.0
> Reporter: Jon Chase
> Priority: Major
> Fix For: 1.20.4
>
>
> We're running Flink 1.20.3 with the GCS file sink and we frequently get the
> following fatal error when GCS returns certain 503's:
>
> {code:java}
> java.io.IOException: Could not perform checkpoint 3568 for operator XXX ->
> XXX -> ... -> FileSink: Writer -> FileSink: Committer (1142/1536)#4.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1394)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:122)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: com.google.cloud.storage.StorageException: Unknown Error
> |> PUT
> https://storage.googleapis.com/upload/storage/v1/b/xxx-xxx-xxxx/o?name=.inprogress/xx-xx-xx/x-x-x/x/xxxxxxx-13.gz/xxxx&uploadType=resumable&upload_id=XXXXXXXXX
> |> content-range: bytes */*
> |> x-goog-gcs-idempotency-token: 421d97a0-dfef-4c8b-bd62-ccf6cd9e3c12
> |
> |< HTTP/1.1 503 Service Unavailable
> |< content-length: 0
> |< content-type: text/plain; charset=utf-8
> |< x-guploader-uploadid:
> AJRbA5WFzXVVcI8389tnO3SPL6WT67GZxMPLyD8hP-TwoHROmMMIZjW1vC85ULIebPXWwTFWEzE6bBc7GeBruYeIj-4LPfgqvW5ohCw5JtixAMw
> |
> at
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:185)
> at
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:117)
> at
> com.google.cloud.storage.JsonResumableSessionFailureScenario.toStorageException(JsonResumableSessionFailureScenario.java:106)
> at
> com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:128)
> at
> com.google.cloud.storage.JsonResumableSession.query(JsonResumableSession.java:57)
> at
> com.google.cloud.storage.JsonResumableSession.lambda$put$0(JsonResumableSession.java:73)
> at com.google.cloud.storage.Retrying.lambda$run$0(Retrying.java:102)
> at
> com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
> at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
> at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
> at com.google.cloud.storage.Retrying.run(Retrying.java:99)
> at
> com.google.cloud.storage.JsonResumableSession.put(JsonResumableSession.java:68)
> at
> com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.internalWrite(ApiaryUnbufferedWritableByteChannel.java:114)
> at
> com.google.cloud.storage.ApiaryUnbufferedWritableByteChannel.writeAndClose(ApiaryUnbufferedWritableByteChannel.java:65)
> at
> com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel.writeAndClose(UnbufferedWritableByteChannelSession.java:40)
> at
> com.google.cloud.storage.DefaultBufferedWritableByteChannel.close(DefaultBufferedWritableByteChannel.java:167)
> at
> com.google.cloud.storage.StorageByteChannels$SynchronizedBufferedWritableByteChannel.close(StorageByteChannels.java:119)
> at
> com.google.cloud.storage.StorageException.wrapIOException(StorageException.java:184)
> at
> com.google.cloud.storage.BaseStorageWriteChannel.close(BaseStorageWriteChannel.java:84)
> at
> org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223)
> at
> org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99)
> at
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240)
> at
> org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.flush(GSRecoverableFsDataOutputStream.java:177)
> at java.base/java.util.zip.DeflaterOutputStream.flush(Unknown Source)
> at
> org.apache.hadoop.io.compress.GzipCodec$GzipOutputStream.flush(GzipCodec.java:107)
> at
> org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter.flush(HadoopCompressionBulkWriter.java:53)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
> at
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.closePartFile(FileWriterBucket.java:280)
> at
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:201)
> at
> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:221)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:332)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1437)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1425)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1382)
> ... 22 more
> Caused by: java.lang.NullPointerException
> at
> com.google.cloud.storage.JsonResumableSessionQueryTask.call(JsonResumableSessionQueryTask.java:117)
> ... 56 more{code}
>
>
> This NPE is thrown b/c of this bug in the GCS connector:
> [https://github.com/googleapis/java-storage/pull/2987]
>
> Upgrading to a newer release of the GCS library that contains the above fix
> resolves the issue in the Flink sink and allows the sink to correctly retry
> these 503's instead of hitting an NPE.
> I'd like to apply this in the 1.20.4 release, since we are running a 1.20.x
> version of Flink and don't plan to upgrade to 2.x anytime soon. That said,
> I'm also happy to apply the fix to the 2.x branch, if someone can tell me the
> appropriate branch to do it on.
>
> I've manually built the GCS file system connector locally with an updated
> version of the GCS library and validated that it addresses this issue.
>
> {code:java}
> Old:
> <fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
> <fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
>
> New:
> <fs.gs.sdk.version>2.62.0</fs.gs.sdk.version>
> <fs.gs.grpc.version>1.76.2</fs.gs.grpc.version>{code}
>
>
> I'm happy to open PR(s) for this once there's alignment on the ticket itself!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)