[
https://issues.apache.org/jira/browse/FLINK-25670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484487#comment-17484487
]
Kyle commented on FLINK-25670:
------------------------------
Actually we are going to handle about 90000 * 100 = 9,000,000 doubles (72MB)
received every 15 minutes. During processing, 9,000,000 * 6 = 54,000,000
doubles (432MB) are retrieved and updated.
We'd like to make the 432MB data as state with high performance and guaranteed
correctness. If they are kept in external storage (e.g. S3/HDFS), then they
could be saved into external storage directly and it is not necessary to
leverage StateFun or Flink.
What is your further suggestion? How could we shard the request and make large
state partitioned in StateFun cluster?
> StateFun: Unable to handle oversize HTTP message if state size is large
> -----------------------------------------------------------------------
>
> Key: FLINK-25670
> URL: https://issues.apache.org/jira/browse/FLINK-25670
> Project: Flink
> Issue Type: Bug
> Components: Stateful Functions
> Affects Versions: statefun-3.1.1
> Reporter: Kyle
> Priority: Major
> Attachments: 00-module.yaml, functions.py
>
>
> Per requirement we need to handle state which is about 500MB large (72MB
> state allocated in commented code as attached). However the HTTP message
> limit disallows us to send back large state to StateFun cluster after saving
> state in Stateful Function.
> Another question is whether large data is allowed to send to Stateful
> Function from ingress.
>
> 2022-01-17 07:57:18,416 WARN
> org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception
> caught while trying to deliver a message: (attempt
> #10)ToFunctionRequestSummary(address=Address(example, hello, 5555),
> batchSize=1, totalSizeInBytes=80, numberOfStates=2)
> org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException:
> Response entity too large: DefaultHttpResponse(decodeResult: success,
> version: HTTP/1.1)
> HTTP/1.1 200 OK
> Content-Type: application/octet-stream
> Content-Length: 40579630
> Date: Mon, 17 Jan 2022 07:57:18 GMT
> Server: Python/3.9 aiohttp/3.8.1
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:276)
> ~[statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator.handleOversizedMessage(HttpObjectAggregator.java:87)
> ~[statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageAggregator.invokeHandleOversizedMessage(MessageAggregator.java:404)
> ~[statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageAggregator.decode(MessageAggregator.java:254)
> ~[statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
> ~[statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:425)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> [statefun-flink-distribution.jar:3.1.1]
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> [statefun-flink-distribution.jar:3.1.1]
> at java.lang.Thread.run(Unknown Source) [?:?]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)