[ 
https://issues.apache.org/jira/browse/HAMA-759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13671427#comment-13671427
 ] 

Suraj Menon commented on HAMA-759:
----------------------------------

Thanks for noticing. The piece of code you have brought into attention would be 
gone. The solution would be to send series of BSPMessageBundle. Apologies for 
the lack of documentation on spilling queue implementation, I shall update it 
as soon as I get some time. For now I think we can implement a 
MessageDispatcher that sends BSPMessageBundle objects to the destination peer 
specified.

Also in the spilling queue implementation, the combiners are one of the 
SpilledDataProcessor that works on the data in the spilled ByteBuffer. As I 
said I shall update the Wiki with all this information.
                
> When peer sends message, not matter what kinds of queue used, all messages 
> read into memory.
> --------------------------------------------------------------------------------------------
>
>                 Key: HAMA-759
>                 URL: https://issues.apache.org/jira/browse/HAMA-759
>             Project: Hama
>          Issue Type: Improvement
>          Components: bsp core
>    Affects Versions: 0.6.1
>            Reporter: MaoYuan Xian
>
> Before bsp peer sends message to network, all messages have to be read into 
> memory, No matter what kinds of outgoing queue used. Which means, even when 
> user use DiskQueue to sending message, no memory saving exactly. In sync() 
> method of BSPPeerImpl.java, 
> {code}
>    while (it.hasNext()) {
>       ... 
>       BSPMessageBundle<M> bundle = combineMessages(messages);
>       // remove this message during runtime to save a bit of memory
>       it.remove();
>       if (combiner != null) {
>         try {
>           messenger.transfer(addr, bundle);
>         } catch (Exception e) {
>           LOG.error("Error while sending messages", e);
>         }
> {code}
> In the combineMessages method, all messages are put into memory:
> {code}
>   private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
>     BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
>     if (combiner != null) {
>       bundle.addMessage(combiner.combine(messages));
>     } else {
>       for (M message : messages) {
>         bundle.addMessage(message);
>       }
>     }
>     return bundle;
>   }
> {code}
> Can we make some modifications here, implements a configurable size sending 
> bundle, only collect parts of message instead of all before sends messages to 
> network.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to