[
https://issues.apache.org/jira/browse/FLINK-994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14053626#comment-14053626
]
ASF GitHub Bot commented on FLINK-994:
--------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/53#discussion_r14594163
--- Diff:
stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/AbstractPagedInputView.java
---
@@ -183,44 +183,69 @@ protected void clear() {
//
--------------------------------------------------------------------------------------------
// Data Input Specific methods
//
--------------------------------------------------------------------------------------------
-
+
@Override
- public void readFully(byte[] b) throws IOException {
- readFully(b, 0, b.length);
+ public int read(byte[] b) throws IOException{
+ return read(b,0,b.length);
}
@Override
- public void readFully(byte[] b, int off, int len) throws IOException {
+ public int read(byte[] b, int off, int len) throws IOException{
if (off < 0 || len < 0 || off + len > b.length) {
throw new IndexOutOfBoundsException();
}
-
+
int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= len) {
this.currentSegment.get(this.positionInSegment, b, off,
len);
this.positionInSegment += len;
+ return len;
}
else {
if (remaining == 0) {
- advance();
+ try {
+ advance();
+ }catch(EOFException eof){
+ return -1;
+ }
remaining = this.limitInSegment -
this.positionInSegment;
}
-
+
+ int bytesRead = 0;
while (true) {
- int toRead = Math.min(remaining, len);
+ int toRead = Math.min(remaining, len-bytesRead);
this.currentSegment.get(this.positionInSegment,
b, off, toRead);
off += toRead;
- len -= toRead;
-
- if (len > 0) {
- advance();
- remaining = this.limitInSegment -
this.positionInSegment;
+ bytesRead += toRead;
+
+ if (len > bytesRead) {
+ try {
+ advance();
+ }catch(EOFException eof){
+ return bytesRead;
+ }
+ remaining = this.limitInSegment -
this.positionInSegment;
}
else {
this.positionInSegment += toRead;
break;
}
}
+ return len;
+ }
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ int bytesRead = read(b,off,len);
+
+ if(bytesRead == -1){
--- End diff --
Yes, you are right that's a bug. I'll add the test cases.
> Replace DataInput and DataOutput of IOReadableWritable with DataInputView and
> DataOutputView
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-994
> URL: https://issues.apache.org/jira/browse/FLINK-994
> Project: Flink
> Issue Type: Improvement
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> The DataInput and DataOutput view are not well designed interfaces with a
> limited function set. Therefore, we should replace the DataInput and
> DataOutput by the Flink defined DataInputView and DataOutputView in the
> IOReadableWritable interface. This is a preparative step for the serializer
> and input/output abstraction rework.
--
This message was sent by Atlassian JIRA
(v6.2#6252)