[
https://issues.apache.org/jira/browse/FLINK-9677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524956#comment-16524956
]
ASF GitHub Bot commented on FLINK-9677:
---------------------------------------
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/6217
[FLINK-9677][rest] Cleanup encoder after request has been processed
## What is the purpose of the change
This PR fixes an issue in the `RestClient` where the
`HttpPostRequestEncoder` would be closed before the request submission was
complete. `Channel#writeAndFlush();` doesn't block which lead to a race
condition between the actual writing and `HttpPostRequestEncoder#cleanFiles()`.
Instead we now register a listener on the last `ChannelFuture` to cleanup
the encoder once the writing is complete.
## Verifying this change
* `MultipartUploadResource` was modified to also include the upload of a
large (64mb) mb file, which is used by `FileUploadHandlerTest` and
`RestClientMultipartTest`
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 9677
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6217.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6217
----
commit 48c10dd686ac97cef6db4703fe61dffa5ff7d913
Author: zentol <chesnay@...>
Date: 2018-06-27T07:34:34Z
[FLINK-9677][rest] Cleanup encoder after request has been processed
----
> RestClient fails for large uploads
> ----------------------------------
>
> Key: FLINK-9677
> URL: https://issues.apache.org/jira/browse/FLINK-9677
> Project: Flink
> Issue Type: Bug
> Components: REST
> Affects Versions: 1.6.0, 1.5.1
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Uploading a large file via the {{RestClient}} can lead to an exception on the
> server:
> {code:java}
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder$ErrorDataDecoderException:
> java.io.IOException: Out of size: 67115495 > 67108864
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.loadDataMultipart(HttpPostMultipartRequestDecoder.java:1377)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.getFileUpload(HttpPostMultipartRequestDecoder.java:907)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.decodeMultipart(HttpPostMultipartRequestDecoder.java:551)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.parseBodyMultipart(HttpPostMultipartRequestDecoder.java:442)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.parseBody(HttpPostMultipartRequestDecoder.java:411)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:336)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:53)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.offer(HttpPostRequestDecoder.java:227)
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:112)
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:66)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Out of size: 67115495 > 67108864
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.AbstractDiskHttpData.addContent(AbstractDiskHttpData.java:151)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.loadDataMultipart(HttpPostMultipartRequestDecoder.java:1375)
> ... 34 more{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)