This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch branch-1 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push: new bbfb4d4 HBASE-24492 : Remove infinite loop from ProtobufLogReader#readNext (#1831) bbfb4d4 is described below commit bbfb4d432fa38b6bfabeea52ba5c2008a597e5ef Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Wed Jun 3 22:21:22 2020 +0530 HBASE-24492 : Remove infinite loop from ProtobufLogReader#readNext (#1831) Signed-off-by: Duo Zhang <zhang...@apache.org> --- .../hbase/regionserver/wal/ProtobufLogReader.java | 191 +++++++++++---------- 1 file changed, 97 insertions(+), 94 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 3edbc85..c72a40d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -325,115 +325,118 @@ public class ProtobufLogReader extends ReaderBase { @Override protected boolean readNext(Entry entry) throws IOException { - while (true) { - // OriginalPosition might be < 0 on local fs; if so, it is useless to us. - long originalPosition = this.inputStream.getPos(); - if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) { + // OriginalPosition might be < 0 on local fs; if so, it is useless to us. + long originalPosition = this.inputStream.getPos(); + if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) { + if (LOG.isTraceEnabled()) { + LOG.trace("Reached end of expected edits area at offset " + originalPosition); + } + return false; + } + WALKey.Builder builder = WALKey.newBuilder(); + long size = 0; + boolean resetPosition = false; + try { + long available = -1; + try { + int firstByte = this.inputStream.read(); + if (firstByte == -1) { + throw new EOFException("First byte is negative at offset " + originalPosition); + } + size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); + // available may be < 0 on local fs for instance. If so, can't depend on it. + available = this.inputStream.available(); + if (available > 0 && available < size) { + throw new EOFException( + "Available stream not enough for edit, " + "inputStream.available()= " + + this.inputStream.available() + ", " + "entry size= " + size + " at offset = " + + this.inputStream.getPos()); + } + ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size), (int) size); + } catch (InvalidProtocolBufferException ipbe) { + resetPosition = true; + throw (EOFException) new EOFException( + "Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + ", currentPosition=" + + this.inputStream.getPos() + ", messageSize=" + size + ", currentAvailable=" + + available).initCause(ipbe); + } + if (!builder.isInitialized()) { + // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. + // If we can get the KV count, we could, theoretically, try to get next record. + throw new EOFException( + "Partial PB while reading WAL, " + "probably an unexpected EOF, ignoring. current offset=" + + this.inputStream.getPos()); + } + WALKey walKey = builder.build(); + entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); + if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { if (LOG.isTraceEnabled()) { - LOG.trace("Reached end of expected edits area at offset " + originalPosition); + LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + + this.inputStream.getPos()); } + seekOnFs(originalPosition); return false; } - WALKey.Builder builder = WALKey.newBuilder(); - long size = 0; - boolean resetPosition = false; + int expectedCells = walKey.getFollowingKvCount(); + long posBefore = this.inputStream.getPos(); try { - long available = -1; - try { - int firstByte = this.inputStream.read(); - if (firstByte == -1) { - throw new EOFException("First byte is negative at offset " + originalPosition); - } - size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); - // available may be < 0 on local fs for instance. If so, can't depend on it. - available = this.inputStream.available(); - if (available > 0 && available < size) { - throw new EOFException("Available stream not enough for edit, " + - "inputStream.available()= " + this.inputStream.available() + ", " + - "entry size= " + size + " at offset = " + this.inputStream.getPos()); - } - ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size), - (int)size); - } catch (InvalidProtocolBufferException ipbe) { + int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); + if (expectedCells != actualCells) { resetPosition = true; - throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + - originalPosition + ", currentPosition=" + this.inputStream.getPos() + - ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe); - } - if (!builder.isInitialized()) { - // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. - // If we can get the KV count, we could, theoretically, try to get next record. - throw new EOFException("Partial PB while reading WAL, " + - "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos()); + throw new EOFException("Only read " + actualCells); // other info added in catch } - WALKey walKey = builder.build(); - entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); - if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { - if (LOG.isTraceEnabled()) { - LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos()); - } - seekOnFs(originalPosition); - return false; - } - int expectedCells = walKey.getFollowingKvCount(); - long posBefore = this.inputStream.getPos(); + } catch (Exception ex) { + String posAfterStr = "<unknown>"; try { - int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); - if (expectedCells != actualCells) { - resetPosition = true; - throw new EOFException("Only read " + actualCells); // other info added in catch - } - } catch (Exception ex) { - String posAfterStr = "<unknown>"; - try { - posAfterStr = this.inputStream.getPos() + ""; - } catch (Throwable t) { - if (LOG.isTraceEnabled()) { - LOG.trace("Error getting pos for error message - ignoring", t); - } + posAfterStr = this.inputStream.getPos() + ""; + } catch (Throwable t) { + if (LOG.isTraceEnabled()) { + LOG.trace("Error getting pos for error message - ignoring", t); } - String message = " while reading " + expectedCells + " WAL KVs; started reading at " - + posBefore + " and read up to " + posAfterStr; - IOException realEofEx = extractHiddenEof(ex); - throw (EOFException) new EOFException("EOF " + message). - initCause(realEofEx != null ? realEofEx : ex); } - if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) { - LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path - + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: " - + this.walEditsStopOffset); - throw new EOFException("Read WALTrailer while reading WALEdits"); + String message = + " while reading " + expectedCells + " WAL KVs; started reading at " + posBefore + + " and read up to " + posAfterStr; + IOException realEofEx = extractHiddenEof(ex); + throw (EOFException) new EOFException("EOF " + message). + initCause(realEofEx != null ? realEofEx : ex); + } + if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) { + LOG.error( + "Read WALTrailer while reading WALEdits. wal: " + this.path + ", inputStream.getPos(): " + + this.inputStream.getPos() + ", walEditsStopOffset: " + this.walEditsStopOffset); + throw new EOFException("Read WALTrailer while reading WALEdits"); + } + } catch (EOFException eof) { + // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs) + if (originalPosition < 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Encountered a malformed edit, but can't seek back to last good position " + + "because originalPosition is negative. last offset=" + this.inputStream.getPos(), + eof); } - } catch (EOFException eof) { - // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs) - if (originalPosition < 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("Encountered a malformed edit, but can't seek back to last good position " - + "because originalPosition is negative. last offset=" - + this.inputStream.getPos(), eof); - } - throw eof; + throw eof; + } + // If stuck at the same place and we got and exception, lets go back at the beginning. + if (inputStream.getPos() == originalPosition && resetPosition) { + if (LOG.isTraceEnabled()) { + LOG.trace("Encountered a malformed edit, seeking to the beginning of the WAL since " + + "current position and original position match at " + originalPosition); } - // If stuck at the same place and we got and exception, lets go back at the beginning. - if (inputStream.getPos() == originalPosition && resetPosition) { - if (LOG.isTraceEnabled()) { - LOG.trace("Encountered a malformed edit, seeking to the beginning of the WAL since " - + "current position and original position match at " + originalPosition); - } - seekOnFs(0); - } else { - // Else restore our position to original location in hope that next time through we will - // read successfully. - if (LOG.isTraceEnabled()) { - LOG.trace("Encountered a malformed edit, seeking back to last good position in file, " - + "from " + inputStream.getPos()+" to " + originalPosition, eof); - } - seekOnFs(originalPosition); + seekOnFs(0); + } else { + // Else restore our position to original location in hope that next time through we will + // read successfully. + if (LOG.isTraceEnabled()) { + LOG.trace( + "Encountered a malformed edit, seeking back to last good position in file, " + "from " + + inputStream.getPos() + " to " + originalPosition, eof); } - return false; + seekOnFs(originalPosition); } - return true; + return false; } + return true; } private IOException extractHiddenEof(Exception ex) {