[ 
https://issues.apache.org/jira/browse/FLINK-994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14053609#comment-14053609
 ] 

ASF GitHub Bot commented on FLINK-994:
--------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/53#discussion_r14593600
  
    --- Diff: 
stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
 ---
    @@ -183,44 +183,69 @@ protected void clear() {
        // 
--------------------------------------------------------------------------------------------
        //                               Data Input Specific methods
        // 
--------------------------------------------------------------------------------------------
    -   
    +
        @Override
    -   public void readFully(byte[] b) throws IOException {
    -           readFully(b, 0, b.length);
    +   public int read(byte[] b) throws IOException{
    +           return read(b,0,b.length);
        }
     
        @Override
    -   public void readFully(byte[] b, int off, int len) throws IOException {
    +   public int read(byte[] b, int off, int len) throws IOException{
                if (off < 0 || len < 0 || off + len > b.length) {
                        throw new IndexOutOfBoundsException();
                }
    -           
    +
                int remaining = this.limitInSegment - this.positionInSegment;
                if (remaining >= len) {
                        this.currentSegment.get(this.positionInSegment, b, off, 
len);
                        this.positionInSegment += len;
    +                   return len;
                }
                else {
                        if (remaining == 0) {
    -                           advance();
    +                           try {
    +                                   advance();
    +                           }catch(EOFException eof){
    +                                   return -1;
    +                           }
                                remaining = this.limitInSegment - 
this.positionInSegment;
                        }
    -                   
    +
    +                   int bytesRead = 0;
                        while (true) {
    -                           int toRead = Math.min(remaining, len);
    +                           int toRead = Math.min(remaining, len-bytesRead);
                                this.currentSegment.get(this.positionInSegment, 
b, off, toRead);
                                off += toRead;
    -                           len -= toRead;
    -                           
    -                           if (len > 0) {
    -                                   advance();
    -                                   remaining = this.limitInSegment - 
this.positionInSegment;       
    +                           bytesRead += toRead;
    +
    +                           if (len > bytesRead) {
    +                                   try {
    +                                           advance();
    +                                   }catch(EOFException eof){
    +                                           return bytesRead;
    +                                   }
    +                                   remaining = this.limitInSegment - 
this.positionInSegment;
                                }
                                else {
                                        this.positionInSegment += toRead;
                                        break;
                                }
                        }
    +                   return len;
    +           }
    +   }
    +   
    +   @Override
    +   public void readFully(byte[] b) throws IOException {
    +           readFully(b, 0, b.length);
    +   }
    +
    +   @Override
    +   public void readFully(byte[] b, int off, int len) throws IOException {
    +           int bytesRead = read(b,off,len);
    +
    +           if(bytesRead == -1){
    --- End diff --
    
    Could you add a small test for this method? Seems like complex logic...


> Replace DataInput and DataOutput of IOReadableWritable with DataInputView and 
> DataOutputView
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-994
>                 URL: https://issues.apache.org/jira/browse/FLINK-994
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> The DataInput and DataOutput view are not well designed interfaces with a 
> limited function set. Therefore, we should replace the DataInput and 
> DataOutput by the Flink defined DataInputView and DataOutputView in the 
> IOReadableWritable interface. This is a preparative step for the serializer 
> and input/output abstraction rework.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to