[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bright chen updated APEXCORE-635:
---------------------------------
    Description: 
Manage memory to avoid memory copy and garbage collection

The aim of this proposal is to reuse the memory to avoid the garbage collection 
and avoid unnecessary memory copy to increase the performance. In this proposal 
the term serde means serialization and deserialization. It’s same as codec.

Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
extends Kryo and optimize it by replace class by class id. And application 
developer can optimize serializer by implement interface StreamCodec. 

First, let’s look into the default codec DefaultStatefulStreamCodec. It 
basically optimize serde by replace class name by class id as my understanding. 
And the state information only send before sending first tuple, it’s kind like 
configuration for serde. So I suggest to separate this feature from serde. The 
benefit is the customized serde can still use this feature. And the kryo have 
some limitation which I’ll state later.


Second, Let’s look at the customized serde. Let’s stand from application 
developer point of view and look at how to implement StreamCodec. I take a 
simple tuple List<String> as example.

The first solution is use kryo. This is basically same as apex default codec.

The second solution is implement StreamCodec for String and List, and ListSerde 
delegate String to StringSerde. The benefit of this solution is the StringSerde 
ListSerde can be reused. The problem is there need a lot of temporary memory 
and memory copy. Following is the sample implement.
Class StringSerde {
  Slice toByteArray(String o) {
    byte[] b = o.getBytes(“UTF8”);              // new bytes
    byte[] b1 = new byte[b1.length + 4];      // new bytes
    set the length of the string at the first 4 bytes
    System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
    return new Slice(b1);
  }

class ListSerde<T> {
  StreamCodec itemSerde;  //the serde for serialize/deserialize item

  Slice toByteArray(List<T> list) {
    Slice[] itemSlices = new Slice[list.size()];
    int size = 0;
    int index = 0;
    for(T item : list) {
      Slice slice = itemSerde.toByteArray(item);
      size += slice.length;
      itemSlices[index++] = slice;
    }
    byte[] b = new byte[size+4];                   //allocated the memory
    set the length of the list at the first 4 bytes
    copy the data from itemSlices
    return new Slice(b);
  }
}
  
from above code, we can see that around 2 times of required memory were 
allocated and data copied twice( one copy maybe special to string, but another 
copy is mandatory). And when bytes written to the socket, all allocated memory 
can’t be reused but need to be garbage collected.

The above tuple only have two levels, if the tuple have n level, n times of 
required memory would be allocated and n-1 time of data copy is required.

The third solution could be allocate memory and then pass the memory and offset 
to item serde. There are some problems for this solution:
How to pass the memory from caller? As our previous interface only pass the 
object but no way to pass memory. So the pass of memory will depends on 
implementation.
Another big problem of this solution is it hard to reallocate proper memory(For 
this special case, it probably can allocate 2 times of all string length. ). 
And the memory allocated more than required would be wasted until data send to 
the socket(or allocate exact memory and copy the data to avoid waste memory). 
And the code also need to handle the case if memory is not enough. 
The fourth solution could be treat whole object as flat, allocate memory and 
handle it. For example as following. This solution solve the problem of pass 
memory. But it has other problems of third solution and introduced some other 
problems:

Can’t reuse the code: we already have the StringSerde, but ListSerde<String> 
have to implement almost same logic again. 
The serializeItemToMemory() method should be implemented depend on different 
item type.
class ListSerde<T> {
  Slice toByteArray(List<T> list) {
    byte[] b = new byte[…];      //hard estimate proper size.
    int size = 0;
    for(T item : list) {
      int length = serializeItemToMemory(item, b, size); 
      size += length;
    }
    Allocate new memory to copy data if don’t want waste memory
  }
}
So, from the analysis of these solutions. It’s not easy to implement good and 
reusable customize serde.



Third, let’s look at the Kryo serde. Kryo provides Output, so each field serde 
write to the same Output. This approach solve the memory problem. But the 
Output has some problem too.

The Output, as a stream, can only write continuously. But it would be problem. 
For example, when Serialize String to LV format. We don’t know what the length 
could be before serialization. 
The Output don’t have cache, which means the serialized data must copy to the 
outside and manage them.
The allocated memory can’t be reused without extra management.
Another copy is required when add partition information.
The memory allocated for different object are not continuous. Which mean need 
another copy when merge multiple serialized tuple into one block to send to 
socket.


My suggest solution is:

Add SerializationBuffer which extends from kryo Output and write data to 
BlockStream. 
BlockStream manages a list of block; BlockStream can reserve space and fill 
value to reserved space; BlockStream can reset the memory when data not used 
any more. We probably can use unsafe mode to increase the performance for this 
part in the future.
Add MemReuseCodec interface which extends StreamCodec, Deprecated Slice 
toByteArray(T o) and add method void toByteArray(T o, SerializationBuffer 
output); Here, toByteArray will not return slice, as the codec could be the top 
level codec or a codec of a field. Call SerializationBuffer.toSlice() to get 
the slice of serialized data.
In Publisher, keep two lists/arrays of slices, one list/array for serialize the 
tuples, another list/array for sending to the socket. When wake up for writing, 
switch the lists/arrays. Then merge the slice to large slice and call socket 
write. Reset the stream after data written.

So the previous ListSerde can be implemented as following:
class ListSerde<T> {
  MemReuseCodec itemSerde;  //the serde for serialize/deserialize item

  void toByteArray(List<T> list, SerializationBuffer buffer) {
    buffer.reserveForLength();
    for(T item : list) {
      itemSerde.toByteArray(item, buffer);
    }
    buffer.fillLength();
  }
}

The benefit of this mechanism
the memory can be reused instead of garbage collected after data send to socket 
avoid unnecessary memory copy. Basically can avoid all extra copy required by 
kryo.
the data which send to socket can be easily merged in a block without extra 
memory copy.
can easily integrate with Kryo serde due to SerializationBuffer extends from 
Output. 


The work need to do to integrate this mechanism to Apex without modifying netlet
Add  MemReuseCodec field in BufferServerPublisher, which initialize in setup() 
if the codec implements MemReuseCodec
Change the DefaultStatefulStreamCodec to implement by using SerializationBuffer
For integrate with socket, basically it only need to override write(byte[] 
message, int offset, int size) and write(). But unfortunately, write() is 
final. So need following walk around. Add interface ListenerExt which only have 
one method writeExt(); Change BufferServerPublisher implements ListenerExt. Add 
DefaultEventLoopExt which extends DefaultEventLoop and override 
handleSelectedKey, for selection key OP_WRITE, if it’s attachment implements 
ListenerExt, call ListenerExt.writeExt(); else call write().

  was:
Manage memory to avoid memory copy and garbage collection

The aim of this proposal is to reuse the memory to avoid the garbage collection 
and avoid unnecessary memory copy to increase the performance. In this proposal 
the term serde means serialization and deserialization. It’s same as codec.

Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
extends Kryo and optimize it by replace class by class id. And application 
developer can optimize serializer by implement interface StreamCodec. 

First, let’s look into the default codec DefaultStatefulStreamCodec. It 
basically optimize serde by replace class name by class id as my understanding. 
And the state information only send before sending first tuple, it’s kind like 
configuration for serde. So I suggest to separate this feature from serde. The 
benefit is the customized serde can still use this feature. And the kryo have 
some limitation which I’ll state later.


Second, Let’s look at the customized serde. Let’s stand from application 
developer point of view and look at how to implement StreamCodec. I take a 
simple tuple List<String> as example.

The first solution is use kryo. This is basically same as apex default codec.

The second solution is implement StreamCodec for String and List, and ListSerde 
delegate String to StringSerde. The benefit of this solution is the StringSerde 
ListSerde can be reused. The problem is there need a lot of temporary memory 
and memory copy. Following is the sample implement.
Class StringCodec {
  Slice toByteArray(String o) {
    byte[] b = o.getBytes(“UTF8”);              // new bytes
    byte[] b1 = new byte[b1.length + 4];      // new bytes
    set the length of the string at the first 4 bytes
    System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
    return new Slice(b1);
  }

class ListSerde<T> {
  StreamCodec itemSerde;  //the serde for serialize/deserialize item

  Slice toByteArray(List<T> list) {
    Slice[] itemSlices = new Slice[list.size()];
    int size = 0;
    int index = 0;
    for(T item : list) {
      Slice slice = itemSerde.toByteArray(item);
      size += slice.length;
      itemSlices[index++] = slice;
    }
    byte[] b = new byte[size+4];                   //allocated the memory
    set the length of the list at the first 4 bytes
    copy the data from itemSlices
    return new Slice(b);
  }
}
  
from above code, we can see that around 2 times of required memory were 
allocated and data copied twice( one copy maybe special to string, but another 
copy is mandatory). And when bytes written to the socket, all allocated memory 
can’t be reused but need to be garbage collected.

The above tuple only have two levels, if the tuple have n level, n times of 
required memory would be allocated and n-1 time of data copy is required.

The third solution could be allocate memory and then pass the memory and offset 
to item serde. There are some problems for this solution:
How to pass the memory from caller? As our previous interface only pass the 
object but no way to pass memory. So the pass of memory will depends on 
implementation.
Another big problem of this solution is it hard to reallocate proper memory(For 
this special case, it probably can allocate 2 times of all string length. ). 
And the memory allocated more than required would be wasted until data send to 
the socket(or allocate exact memory and copy the data to avoid waste memory). 
And the code also need to handle the case if memory is not enough. 
The fourth solution could be treat whole object as flat, allocate memory and 
handle it. For example as following. This solution solve the problem of pass 
memory. But it has other problems of third solution and introduced some other 
problems:

Can’t reuse the code: we already have the StringSerde, but ListSerde<String> 
have to implement almost same logic again. 
The serializeItemToMemory() method should be implemented depend on different 
item type.
class ListSerde<T> {
  Slice toByteArray(List<T> list) {
    byte[] b = new byte[…];      //hard estimate proper size.
    int size = 0;
    for(T item : list) {
      int length = serializeItemToMemory(item, b, size); 
      size += length;
    }
    Allocate new memory to copy data if don’t have waste memory
  }
}
So, from the analysis of these solutions. It’s not easy to implement good and 
reusable customize serde.



Third, let’s look at the Kryo serde. Kryo provides Output, so each field serde 
write to the same Output. This approach solve the memory problem. But the 
Output has some problem too.

The Output, as a stream, can only write continuously. But it would be problem. 
For example, when Serialize String to LV format. We don’t know what the length 
could be before serialization. 
The Output don’t have cache, which means the serialized data must copy to the 
outside and manage them.
The allocated memory can’t be reused without extra management.
Another copy is required when add partition information.
The memory allocated for different object are not continuous. Which mean need 
another copy when merge multiple serialized tuple into one block to send to 
socket.


My suggest solution is:

Add SerializationBuffer which extends from kryo Output and write data to 
BlockStream. 
BlockStream manages a list of block; BlockStream can reserve space; BlockStream 
can reset the memory when data not used any more. We probably can use unsafe 
mode to increase the performance for this part in the future.
Add MemReuseCodec interface which extends StreamCodec, Deprecated Slice 
toByteArray(T o) and add method void toByteArray(T o, SerializationBuffer 
output); Here, toByteArray will not return slice, as the codec could be the top 
level codec or a codec of a field. Call SerializationBuffer.toSlice() to get 
the slice of serialized data.
In Publisher, keep two lists/arrays of slices, one list/array for serialize the 
tuples, another list/array for sending to the socket. When wake up for writing, 
switch the lists/arrays. Then merge the slice to large slice and call socket 
write. Reset the stream after data written.

So the previous ListSerde can be implemented as following:
class ListSerde<T> {
  MemReuseCodec itemSerde;  //the serde for serialize/deserialize item

  void toByteArray(List<T> list, SerializationBuffer buffer) {
    for(T item : list) {
      itemSerde.toByteArray(item, buffer);
    }
  }
}

The benefit of this mechanism
the memory can be reused instead of garbage collected after data send to socket 
avoid unnecessary memory copy. Basically can avoid all extra copy required by 
kryo.
the data which send to socket can be easily merged in a block without extra 
memory copy.
can easily integrate with Kryo serde due to SerializationBuffer extends from 
Output. 


The work need to do to integrate this mechanism to Apex without modifying netlet
Add  MemReuseCodec field in BufferServerPublisher, which initialize in setup() 
if the codec implements MemReuseCodec
Change the DefaultStatefulStreamCodec to implement by using SerializationBuffer
For integrate with socket, basically it only need to override write(byte[] 
message, int offset, int size) and write(). But unfortunately, write() is 
final. So need following walk around. Add interface ListenerExt which only have 
one method writeExt(); Change BufferServerPublisher implements ListenerExt. Add 
DefaultEventLoopExt which extends DefaultEventLoop and override 
handleSelectedKey, for selection key OP_WRITE, if it’s attachment implements 
ListenerExt, call ListenerExt.writeExt(); else call write().


> Proposal: Manage memory to avoid memory copy and garbage collection
> -------------------------------------------------------------------
>
>                 Key: APEXCORE-635
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-635
>             Project: Apache Apex Core
>          Issue Type: Wish
>            Reporter: bright chen
>            Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List<String> as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
>     byte[] b = o.getBytes(“UTF8”);              // new bytes
>     byte[] b1 = new byte[b1.length + 4];      // new bytes
>     set the length of the string at the first 4 bytes
>     System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
>     return new Slice(b1);
>   }
> class ListSerde<T> {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List<T> list) {
>     Slice[] itemSlices = new Slice[list.size()];
>     int size = 0;
>     int index = 0;
>     for(T item : list) {
>       Slice slice = itemSerde.toByteArray(item);
>       size += slice.length;
>       itemSlices[index++] = slice;
>     }
>     byte[] b = new byte[size+4];                   //allocated the memory
>     set the length of the list at the first 4 bytes
>     copy the data from itemSlices
>     return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution solve the problem of pass 
> memory. But it has other problems of third solution and introduced some other 
> problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde<String> 
> have to implement almost same logic again. 
> The serializeItemToMemory() method should be implemented depend on different 
> item type.
> class ListSerde<T> {
>   Slice toByteArray(List<T> list) {
>     byte[] b = new byte[…];      //hard estimate proper size.
>     int size = 0;
>     for(T item : list) {
>       int length = serializeItemToMemory(item, b, size); 
>       size += length;
>     }
>     Allocate new memory to copy data if don’t want waste memory
>   }
> }
> So, from the analysis of these solutions. It’s not easy to implement good and 
> reusable customize serde.
> Third, let’s look at the Kryo serde. Kryo provides Output, so each field 
> serde write to the same Output. This approach solve the memory problem. But 
> the Output has some problem too.
> The Output, as a stream, can only write continuously. But it would be 
> problem. For example, when Serialize String to LV format. We don’t know what 
> the length could be before serialization. 
> The Output don’t have cache, which means the serialized data must copy to the 
> outside and manage them.
> The allocated memory can’t be reused without extra management.
> Another copy is required when add partition information.
> The memory allocated for different object are not continuous. Which mean need 
> another copy when merge multiple serialized tuple into one block to send to 
> socket.
> My suggest solution is:
> Add SerializationBuffer which extends from kryo Output and write data to 
> BlockStream. 
> BlockStream manages a list of block; BlockStream can reserve space and fill 
> value to reserved space; BlockStream can reset the memory when data not used 
> any more. We probably can use unsafe mode to increase the performance for 
> this part in the future.
> Add MemReuseCodec interface which extends StreamCodec, Deprecated Slice 
> toByteArray(T o) and add method void toByteArray(T o, SerializationBuffer 
> output); Here, toByteArray will not return slice, as the codec could be the 
> top level codec or a codec of a field. Call SerializationBuffer.toSlice() to 
> get the slice of serialized data.
> In Publisher, keep two lists/arrays of slices, one list/array for serialize 
> the tuples, another list/array for sending to the socket. When wake up for 
> writing, switch the lists/arrays. Then merge the slice to large slice and 
> call socket write. Reset the stream after data written.
> So the previous ListSerde can be implemented as following:
> class ListSerde<T> {
>   MemReuseCodec itemSerde;  //the serde for serialize/deserialize item
>   void toByteArray(List<T> list, SerializationBuffer buffer) {
>     buffer.reserveForLength();
>     for(T item : list) {
>       itemSerde.toByteArray(item, buffer);
>     }
>     buffer.fillLength();
>   }
> }
> The benefit of this mechanism
> the memory can be reused instead of garbage collected after data send to 
> socket 
> avoid unnecessary memory copy. Basically can avoid all extra copy required by 
> kryo.
> the data which send to socket can be easily merged in a block without extra 
> memory copy.
> can easily integrate with Kryo serde due to SerializationBuffer extends from 
> Output. 
> The work need to do to integrate this mechanism to Apex without modifying 
> netlet
> Add  MemReuseCodec field in BufferServerPublisher, which initialize in 
> setup() if the codec implements MemReuseCodec
> Change the DefaultStatefulStreamCodec to implement by using 
> SerializationBuffer
> For integrate with socket, basically it only need to override write(byte[] 
> message, int offset, int size) and write(). But unfortunately, write() is 
> final. So need following walk around. Add interface ListenerExt which only 
> have one method writeExt(); Change BufferServerPublisher implements 
> ListenerExt. Add DefaultEventLoopExt which extends DefaultEventLoop and 
> override handleSelectedKey, for selection key OP_WRITE, if it’s attachment 
> implements ListenerExt, call ListenerExt.writeExt(); else call write().



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to