[ 
https://issues.apache.org/jira/browse/FLINK-11187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16725942#comment-16725942
 ] 

Steve Loughran commented on FLINK-11187:
----------------------------------------

There's limits to how well you can buffer...if the post fails it goes back to 
byte 0 and retries, which is done with a reset() to the marker set when 
uploading that part began.

Better to serve up directly from a file, if that's the source, or, if its 
memory, having an input stream which [supports mark/reset to anywhere in the 
entire 
block|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java#L640]

> StreamingFileSink with S3 backend transient socket timeout issues 
> ------------------------------------------------------------------
>
>                 Key: FLINK-11187
>                 URL: https://issues.apache.org/jira/browse/FLINK-11187
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystem, Streaming Connectors
>    Affects Versions: 1.7.0, 1.7.1
>            Reporter: Addison Higham
>            Assignee: Addison Higham
>            Priority: Major
>             Fix For: 1.7.2
>
>
> When using the StreamingFileSink with S3A backend, occasionally, errors like 
> this will occur:
> {noformat}
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>  Your socket connection to the server was not read from or written to within 
> the timeout period. Idle connections will be closed. (Service: Amazon S3; 
> Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended 
> Request ID: xxx, S3 Extended Request ID: xxx
>        at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>        at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>        at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat}
> This causes a restart of flink job, which is often able to recover from, but 
> under heavy load, this can become very frequent.
>  
> Turning on debug logs you can find the following relevant stack trace:
> {noformat}
> 2018-12-17 05:55:46,546 DEBUG 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  - FYI: 
> failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>   at 
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>   at 
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>   at 
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748){noformat}
> This error occurs because of a transient failure in writing a multipart chunk 
> fails and the underlying InputStream cannot be reset. This ResetException 
> should be thrown to the client (as documented here: 
> [https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html])
>  but for some reason is not, instead, the client is retrying the request, but 
> now with a fully consumed InputStream. Because this InputStream is 
> empty/smaller we can't fill up the Content-Length that the multipart upload 
> is expecting, so the socket hangs to eventually be timed out.
>  
> This failure happens roughly ~20 times before the AWS client retry logic 
> finally fails the request and the socket time out exception is thrown.
>  
> As mentioned in the best practice AWS doc, the best fix for this is to use a 
> File or FileInputStream object or to use the setReadLimit. I tried to use a 
> global SDK property (com.amazonaws.sdk.s3.defaultStreamBufferSize) to set 
> this value, but that did not fix the problem, which I believe is because the 
> InputStream is not mark-able and the AWS client doesn't wrap the stream.
>  
> What is confirmed to work is the following patch: 
> [https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6]
>  
> That is obviously not ideal, but it may suffice to just make that 
> configurable.
>  
> The other option is to instead expose the S3A WriteHelper option to pass a 
> file to the S3AccessHelper and change the other relevant classes 
> (RefCountedFSOutputStream) to expose the File object and directly hand that 
> to the S3A WriteHelper



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to