[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15351979#comment-15351979
 ] 

bright chen commented on APEXMALHAR-2126:
-----------------------------------------

let me take endWindow() of 
org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl<K, V> as an 
example.

  public void endWindow()
  {
    for (K key: cache.getChangedKeys()) {
      store.put(this.bucket, SliceUtils.concatenate(identifier, 
serdeKey.serialize(key)),
          SliceUtils.concatenate(identifier, 
serdeValue.serialize(cache.get(key))));
    }
    …
  }

  public static Slice concatenate(byte[] a, Slice b)
  {
    int size = a.length + b.length;
    byte[] bytes = new byte[size];

    System.arraycopy(a, 0, bytes, 0, a.length);
    System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);

    return new Slice(bytes);
  }

so, each time call concatenate(), a new memory block allocated.

But with new approach, these can be avoided
  private transient BufferSlice slice = new BufferSlice(SIZE);
  public void endWindow()
  {
    slice.clear();
    for (K key: cache.getChangedKeys()) {
      store.put(this.bucket, 
slice.append(identifier).append(serdeKey.serialize(key)).endSlice(),
          
slice.append(identifier).append(serdeValue.serialize(cache.get(key))).endSlice());
    }
    …
  }

The class can be implemented like following:
public class BufferSlice
{
  protected byte[] buffer;
  protected int offset;
  protected int length;
  
  public BufferSlice(int size)
  {
    buffer = new byte[size];
  }
  
  public void append(byte[] data)
  {
    //TODO: handle over sized
    System.arraycopy(buffer, offset + length, data, 0, data.length);
    length += data.length;
  }
  
  public Slice endSlice()
  {
    Slice slice = new Slice(buffer, offset, length);
    offset += length;
    length = 0;
    return slice;
  }
}


> Suggest: Share Slice Buffer
> ---------------------------
>
>                 Key: APEXMALHAR-2126
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: bright chen
>
> I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to 
> share the buffer and avoid unnecessary memory allocation/deallocation. But 
> the intension is not self-explain and lack of method to share the memory. And 
> the util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create 
> new memory and copy the data.
> I suggest to implement another class(Say BufferSlice), which 
> - initialize buffer with relative large buffer
> - support append(byte[] data, int offset, int length)
> - dynamic reallocated buffer or throw exception when buffer is full ( based 
> on the management strategy)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to