[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15860770#comment-15860770
 ] 

Vlad Rozov commented on APEXCORE-635:
-------------------------------------

- Memory copy is required as it moves data from JVM heap to direct (off-heap) 
buffer. It is done prior to writing the buffer to a network channel.
- Please benchmark copying one large block of memory in a single step and in 
multiple steps. What is the difference (in %)? Do the same for a direct buffer.
- How will control tuples be injected into a continuous block of memory?
- The current approach is fire and forget, the proposed requires some kind of 
an acknowledgment. I am not convinced that the design that requires an 
acknowledgment will be faster.
- There is a plan to move to netty from netlet and netty provides buffer pool 
allocator that may better fit the proposed design.
- The current format uses variable length (1, 2, 3 or 4 bytes) prefix and it is 
not possible to know offset where data needs to be written prior to the lenght 
of the serialized data is known. 

> Proposal: Manage memory to avoid memory copy and garbage collection
> -------------------------------------------------------------------
>
>                 Key: APEXCORE-635
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-635
>             Project: Apache Apex Core
>          Issue Type: Wish
>            Reporter: bright chen
>            Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List<String> as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
>     byte[] b = o.getBytes(“UTF8”);              // new bytes
>     byte[] b1 = new byte[b1.length + 4];      // new bytes
>     set the length of the string at the first 4 bytes
>     System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
>     return new Slice(b1);
>   }
> class ListSerde<T> {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List<T> list) {
>     Slice[] itemSlices = new Slice[list.size()];
>     int size = 0;
>     int index = 0;
>     for(T item : list) {
>       Slice slice = itemSerde.toByteArray(item);
>       size += slice.length;
>       itemSlices[index++] = slice;
>     }
>     byte[] b = new byte[size+4];                   //allocated the memory
>     set the length of the list at the first 4 bytes
>     copy the data from itemSlices
>     return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution solve the problem of pass 
> memory. But it has other problems of third solution and introduced some other 
> problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde<String> 
> have to implement almost same logic again. 
> The serializeItemToMemory() method should be implemented depend on different 
> item type.
> class ListSerde<T> {
>   Slice toByteArray(List<T> list) {
>     byte[] b = new byte[…];      //hard estimate proper size.
>     int size = 0;
>     for(T item : list) {
>       int length = serializeItemToMemory(item, b, size); 
>       size += length;
>     }
>     Allocate new memory to copy data if don’t want waste memory
>   }
> }
> So, from the analysis of these solutions. It’s not easy to implement good and 
> reusable customize serde.
> Third, let’s look at the Kryo serde. Kryo provides Output, so each field 
> serde write to the same Output. This approach solve the memory problem. But 
> the Output has some problem too.
> The Output, as a stream, can only write continuously. But it would be 
> problem. For example, when Serialize String to LV format. We don’t know what 
> the length could be before serialization. 
> The Output don’t have cache, which means the serialized data must copy to the 
> outside and manage them.
> The allocated memory can’t be reused without extra management.
> Another copy is required when add partition information.
> The memory allocated for different object are not continuous. Which mean need 
> another copy when merge multiple serialized tuple into one block to send to 
> socket.
> My suggest solution is:
> Add SerializationBuffer which extends from kryo Output and write data to 
> BlockStream. 
> BlockStream manages a list of block; BlockStream can reserve space and fill 
> value to reserved space; BlockStream can reset the memory when data not used 
> any more. We probably can use unsafe mode to increase the performance for 
> this part in the future.
> Add MemReuseCodec interface which extends StreamCodec, Deprecated Slice 
> toByteArray(T o) and add method void toByteArray(T o, SerializationBuffer 
> output); Here, toByteArray will not return slice, as the codec could be the 
> top level codec or a codec of a field. Call SerializationBuffer.toSlice() to 
> get the slice of serialized data.
> In Publisher, keep two lists/arrays of slices, one list/array for serialize 
> the tuples, another list/array for sending to the socket. When wake up for 
> writing, switch the lists/arrays. Then merge the slice to large slice and 
> call socket write. Reset the stream after data written.
> So the previous ListSerde can be implemented as following:
> class ListSerde<T> {
>   MemReuseCodec itemSerde;  //the serde for serialize/deserialize item
>   void toByteArray(List<T> list, SerializationBuffer buffer) {
>     buffer.reserveForLength();
>     for(T item : list) {
>       itemSerde.toByteArray(item, buffer);
>     }
>     buffer.fillLength();
>   }
> }
> The benefit of this mechanism
> the memory can be reused instead of garbage collected after data send to 
> socket 
> avoid unnecessary memory copy. Basically can avoid all extra copy required by 
> kryo.
> the data which send to socket can be easily merged in a block without extra 
> memory copy.
> can easily integrate with Kryo serde due to SerializationBuffer extends from 
> Output. 
> The work need to do to integrate this mechanism to Apex without modifying 
> netlet
> Add  MemReuseCodec field in BufferServerPublisher, which initialize in 
> setup() if the codec implements MemReuseCodec
> Change the DefaultStatefulStreamCodec to implement by using 
> SerializationBuffer
> For integrate with socket, basically it only need to override write(byte[] 
> message, int offset, int size) and write(). But unfortunately, write() is 
> final. So need following walk around. Add interface ListenerExt which only 
> have one method writeExt(); Change BufferServerPublisher implements 
> ListenerExt. Add DefaultEventLoopExt which extends DefaultEventLoop and 
> override handleSelectedKey, for selection key OP_WRITE, if it’s attachment 
> implements ListenerExt, call ListenerExt.writeExt(); else call write().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to