[
https://issues.apache.org/jira/browse/FLINK-15805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kostas Kloudas reassigned FLINK-15805:
--------------------------------------
Assignee: Kostas Kloudas
> StreamingFileSink on S3 fails when firing processing time timers after
> recovery.
> --------------------------------------------------------------------------------
>
> Key: FLINK-15805
> URL: https://issues.apache.org/jira/browse/FLINK-15805
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Affects Versions: 1.9.1, 1.10.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Critical
>
> {code:java}
> org.apache.flink.util.SerializedThrowable: Caught exception while processing
> timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_232]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_232]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_232]
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException:
> Uploading parts failed
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>
> ... 7 more
> Caused by: org.apache.flink.util.SerializedThrowable: Uploading parts failed
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231)
> ~[?:?]
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215)
> ~[?:?]
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151)
> ~[?:?]
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123)
> ~[?:?]
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56)
> ~[?:?]
> at
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167)
> ~[?:?]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
> ~[flink-dist_2.11-1.9.1-stream2.jar:?]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:235)
> ~[flink-dist_2.11-1.9.1-stream2.jar:?]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:334)
> ~[flink-dist_2.11-1.9.1-stream2.jar:?]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:297)
> ~[flink-dist_2.11-1.9.1-stream2.jar:?]
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:364)
> ~[flink-dist_2.11-1.9.1-stream2.jar:?]
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
> ~[flink-dist_2.11-1.9.1-stream2.jar:1.9.1-stream2]
> ... 7 more
> Caused by: org.apache.flink.util.SerializedThrowable: upload part on
> BUCKET/2019/12/05/22/04/part-5-30280:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> The specified upload does not exist. The upload ID may be invalid, or the
> upload may have been aborted or completed.
> S3 Extended Request ID:
> tKO5p6woMJi1tDIp+KBnZ4BnkHMw1W1XIbF+B8YjxSIPG6NS21Nnh/RoYPumkvuP0WROIiCTj2k=:NoSuchUpload
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
> ~[?:?]
> at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
> ~[?:?]
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
> ~[?:?]
> at
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
> ~[?:?]
> ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: The specified upload
> does not exist. The upload ID may be invalid, or the upload may have been
> aborted or completed.
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
> ~[?:?]
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
> ~[?:?]
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
> ~[?:?]
> at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73)
> ~[?:?]
> at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318)
> ~[?:?]
> at
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
> ~[?:?]
> ... 3 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)