Repository: carbondata Updated Branches: refs/heads/master e2a2d9931 -> 59eff88b0
[CARBONDATA-1781] Fix EOFException in StreamBlockletReader In StreamBlockletReader.readBytesFromStream method, if the length of the reading is not enough, continue to read more data. This closes #1621 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/59eff88b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/59eff88b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/59eff88b Branch: refs/heads/master Commit: 59eff88b02817d711e40bffb2b065ba0c261d395 Parents: e2a2d99 Author: QiangCai <[email protected]> Authored: Wed Dec 6 16:32:56 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Dec 6 22:47:15 2017 +0800 ---------------------------------------------------------------------- .../hadoop/streaming/StreamBlockletReader.java | 45 ++++++++++++-------- 1 file changed, 28 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/59eff88b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java index eafb142..1989198 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java @@ -48,11 +48,11 @@ public class StreamBlockletReader { StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) { this.syncMarker = syncMarker; - this.syncLen = syncMarker.length; - this.syncBuffer = new byte[syncMarker.length]; + syncLen = syncMarker.length; + syncBuffer = new byte[syncLen]; this.in = in; - this.limitStart = limit; - this.limitEnd = limitStart + syncLen; + limitStart = limit; + limitEnd = limitStart + syncLen; this.isHeaderPresent = isHeaderPresent; } @@ -66,11 +66,9 @@ public class StreamBlockletReader { * find the first position of sync_marker in input stream */ private boolean sync() throws IOException { - int len = in.read(syncBuffer); - if (len < syncLen) { + if (!readBytesFromStream(syncBuffer, 0, syncLen)) { return false; } - pos += syncLen; boolean skipHeader = false; for (int i = 0; i < limitStart; i++) { int j = 0; @@ -101,7 +99,9 @@ public class StreamBlockletReader { BlockletHeader readBlockletHeader() throws IOException { int len = readIntFromStream(); byte[] b = new byte[len]; - readBytesFromStream(b); + if (!readBytesFromStream(b, 0, len)) { + throw new EOFException("Failed to read blocklet header"); + } BlockletHeader header = CarbonUtil.readBlockletHeader(b); rowNums = header.getBlocklet_info().getNum_rows(); rowIndex = 0; @@ -113,7 +113,9 @@ public class StreamBlockletReader { offset = 0; int len = readIntFromStream(); byte[] b = new byte[len]; - readBytesFromStream(b); + if (!readBytesFromStream(b, 0, len)) { + throw new EOFException("Failed to read blocklet data"); + } compressor.rawUncompress(b, buffer); } @@ -143,11 +145,9 @@ public class StreamBlockletReader { return false; } if (isAlreadySync) { - int v = in.read(syncBuffer); - if (v < syncLen) { + if (!readBytesFromStream(syncBuffer, 0, syncLen)) { return false; } - pos += syncLen; } else { isAlreadySync = true; if (!sync()) { @@ -176,12 +176,23 @@ public class StreamBlockletReader { return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); } - void readBytesFromStream(byte[] b) throws IOException { - int len = in.read(b, 0, b.length); - if (len < b.length) { - throw new EOFException(); + /** + * Reads <code>len</code> bytes of data from the input stream into + * an array of bytes. + * @return <code>true</code> if reading data successfully, or + * <code>false</code> if there is no more data because the end of the stream has been reached. + */ + boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException { + int readLen = in.read(b, offset, len); + if (readLen < 0) { + return false; + } + pos += readLen; + if (readLen < len) { + return readBytesFromStream(b, offset + readLen, len - readLen); + } else { + return true; } - pos += b.length; } boolean readBoolean() {
