[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920867#comment-16920867
 ] 

Biao Liu commented on FLINK-4399:
---------------------------------

Hi [~till.rohrmann], thanks for feedback!

{quote}Instead, it could be enough to add some utilities to offload large 
payloads to the BlobServer and then handle all message which can carry a large 
user code payload on a higher level as we do with the 
TaskDeploymentDescriptor.{quote}
I can't agree more. I don't think using RPC system to handle large message is 
the best practice. Through {{BlobServer}} outside RPC system is a good choice 
to me. I have described the reason detailedly in doc.

{quote}As I've written in the Google doc, I assume that we would also define an 
upper bound for the message size because otherwise it can easily cause OOM 
errors.{quote}
I think you have raised a significant question. Does this upper bound should 
exist? 

1. The memory issue, for example the risk of OOM. If the message needs to be 
serialized/deserialized in memory, the risk of OOM can't be avoided even we 
transfer it through {{BlobServer}}. Because we always have to allocated a big 
buffer to do the serialization/deserialization. So I think the key point here 
is whether we serialize/deserialize the large message in memory or not.
2. Could we cover all risks of large message through {{BlobServer}}? I guess 
it's hard to do so. For example, accumulator, metrics, source split, usually 
they are quite small, far away from the upper limit. However it could be large. 
It depends on the implementation of user code. If we use {{BlobServer}} to 
transfer all these messages, the performance might regress a lot for the 
general cases. And it's also hard to judge whether it's large or not without 
serializing it.

To sum up, I guess the upper bound of RPC message is not so important as we 
imaged. Or a very large upper bound (maybe 100M+) for protection is meaningful. 
And we can't get rid of all risks through {{BlobServer}} outside RPC system.

So IMO the best practice of handling large message is:
1. Using {{BlobServer}} outside RPC system to handle probably large message, 
like {{JobGraph}} and {{TaskDeploymentDescriptor}}. Adding some common 
utilities is a good idea.
2. RPC system could support large message without upper bound or with a very 
large upper bound. The performance might not be good. But it's a performance 
issue which user should tune. Splitting the large message into chunks or giving 
a larger RPC size limit are both acceptable to me. Splitting message is more 
friendly to other small messages queuing in the RPC system, while giving a 
large size limit does not need any development.

 

> Add support for oversized messages
> ----------------------------------
>
>                 Key: FLINK-4399
>                 URL: https://issues.apache.org/jira/browse/FLINK-4399
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>         Environment: FLIP-6 feature branch
>            Reporter: Stephan Ewen
>            Assignee: Biao Liu
>            Priority: Major
>              Labels: flip-6
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to