[ 
https://issues.apache.org/jira/browse/FLINK-11775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798922#comment-16798922
 ] 

Piotr Nowojski commented on FLINK-11775:
----------------------------------------

Sorry for asking maybe stupid question, I'm not very familiar with this code. 
Shouldn't we in that case try to optimize the 
{{HybridMemorySegment#put(java.io.DataInput, int, int)}} for off heap cases. 
For example for cases when {{DataInput}} is backed by array or something that 
can be easily wrapped as ByteBuffer or something else that's efficient? Like:

{code:java}
        @Override
        public final void put(DataInput in, int offset, int length) throws 
IOException {
                if (address <= addressLimit) {
                        if (heapMemory != null) {
                                in.readFully(heapMemory, offset, length);
                        }
                        else {
                                ByteBuffer src = in.wrapAsByteBuffer();
                                offHeapBuffer.put(src);
                        }
                }
                else {
                        throw new IllegalStateException("segment has been 
freed");
                }
        }
{code}

and provide some efficient implementation of  {{wrapAsByteBuffer()}} for 
{{DataInputView}} that are wrapping {{MemorySegment}}?

> Introduce MemorySegmentWritable to let DataOutputView direct copy to internal 
> bytes
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-11775
>                 URL: https://issues.apache.org/jira/browse/FLINK-11775
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Operators
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>            Priority: Major
>
> Blink new binary format is based on MemorySegment.
> Introduce MemorySegmentWritable to let DataOutputView direct copy to internal 
> bytes
> {code:java}
> /**
>  * Provides the interface for write(Segment).
>  */
> public interface MemorySegmentWritable {
>  /**
>  * Writes {@code len} bytes from memory segment {@code segment} starting at 
> offset {@code off}, in order,
>  * to the output.
>  *
>  * @param segment memory segment to copy the bytes from.
>  * @param off the start offset in the memory segment.
>  * @param len The number of bytes to copy.
>  * @throws IOException if an I/O error occurs.
>  */
>  void write(MemorySegment segment, int off, int len) throws IOException;
> }{code}
>  
> If we want to write a Memory Segment to DataOutputView, we need to copy bytes 
> to byte[] and then write it in, which is less effective.
> If we let AbstractPagedOutputView have a write(MemorySegment) interface, we 
> can copy it directly.
> We need to ensure this in network serialization, batch operator calculation 
> serialization, Streaming State serialization to avoid new byte[] and copy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to