This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new bbfb4d4  HBASE-24492 : Remove infinite loop from 
ProtobufLogReader#readNext (#1831)
bbfb4d4 is described below

commit bbfb4d432fa38b6bfabeea52ba5c2008a597e5ef
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Wed Jun 3 22:21:22 2020 +0530

    HBASE-24492 : Remove infinite loop from ProtobufLogReader#readNext (#1831)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
---
 .../hbase/regionserver/wal/ProtobufLogReader.java  | 191 +++++++++++----------
 1 file changed, 97 insertions(+), 94 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 3edbc85..c72a40d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -325,115 +325,118 @@ public class ProtobufLogReader extends ReaderBase {
 
   @Override
   protected boolean readNext(Entry entry) throws IOException {
-    while (true) {
-      // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
-      long originalPosition = this.inputStream.getPos();
-      if (trailerPresent && originalPosition > 0 && originalPosition == 
this.walEditsStopOffset) {
+    // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
+    long originalPosition = this.inputStream.getPos();
+    if (trailerPresent && originalPosition > 0 && originalPosition == 
this.walEditsStopOffset) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Reached end of expected edits area at offset " + 
originalPosition);
+      }
+      return false;
+    }
+    WALKey.Builder builder = WALKey.newBuilder();
+    long size = 0;
+    boolean resetPosition = false;
+    try {
+      long available = -1;
+      try {
+        int firstByte = this.inputStream.read();
+        if (firstByte == -1) {
+          throw new EOFException("First byte is negative at offset " + 
originalPosition);
+        }
+        size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
+        // available may be < 0 on local fs for instance.  If so, can't depend 
on it.
+        available = this.inputStream.available();
+        if (available > 0 && available < size) {
+          throw new EOFException(
+            "Available stream not enough for edit, " + 
"inputStream.available()= "
+              + this.inputStream.available() + ", " + "entry size= " + size + 
" at offset = "
+              + this.inputStream.getPos());
+        }
+        ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, 
size), (int) size);
+      } catch (InvalidProtocolBufferException ipbe) {
+        resetPosition = true;
+        throw (EOFException) new EOFException(
+          "Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + 
", currentPosition="
+            + this.inputStream.getPos() + ", messageSize=" + size + ", 
currentAvailable="
+            + available).initCause(ipbe);
+      }
+      if (!builder.isInitialized()) {
+        // TODO: not clear if we should try to recover from corrupt PB that 
looks semi-legit.
+        //       If we can get the KV count, we could, theoretically, try to 
get next record.
+        throw new EOFException(
+          "Partial PB while reading WAL, " + "probably an unexpected EOF, 
ignoring. current offset="
+            + this.inputStream.getPos());
+      }
+      WALKey walKey = builder.build();
+      entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
+      if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Reached end of expected edits area at offset " + 
originalPosition);
+          LOG.trace("WALKey has no KVs that follow it; trying the next one. 
current offset="
+            + this.inputStream.getPos());
         }
+        seekOnFs(originalPosition);
         return false;
       }
-      WALKey.Builder builder = WALKey.newBuilder();
-      long size = 0;
-      boolean resetPosition = false;
+      int expectedCells = walKey.getFollowingKvCount();
+      long posBefore = this.inputStream.getPos();
       try {
-        long available = -1;
-        try {
-          int firstByte = this.inputStream.read();
-          if (firstByte == -1) {
-            throw new EOFException("First byte is negative at offset " + 
originalPosition);
-          }
-          size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
-          // available may be < 0 on local fs for instance.  If so, can't 
depend on it.
-          available = this.inputStream.available();
-          if (available > 0 && available < size) {
-            throw new EOFException("Available stream not enough for edit, " +
-                "inputStream.available()= " + this.inputStream.available() + 
", " +
-                "entry size= " + size + " at offset = " + 
this.inputStream.getPos());
-          }
-          ProtobufUtil.mergeFrom(builder, new 
LimitInputStream(this.inputStream, size),
-            (int)size);
-        } catch (InvalidProtocolBufferException ipbe) {
+        int actualCells = entry.getEdit().readFromCells(cellDecoder, 
expectedCells);
+        if (expectedCells != actualCells) {
           resetPosition = true;
-          throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; 
originalPosition=" +
-            originalPosition + ", currentPosition=" + 
this.inputStream.getPos() +
-            ", messageSize=" + size + ", currentAvailable=" + 
available).initCause(ipbe);
-        }
-        if (!builder.isInitialized()) {
-          // TODO: not clear if we should try to recover from corrupt PB that 
looks semi-legit.
-          //       If we can get the KV count, we could, theoretically, try to 
get next record.
-          throw new EOFException("Partial PB while reading WAL, " +
-              "probably an unexpected EOF, ignoring. current offset=" + 
this.inputStream.getPos());
+          throw new EOFException("Only read " + actualCells); // other info 
added in catch
         }
-        WALKey walKey = builder.build();
-        entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
-        if (!walKey.hasFollowingKvCount() || 0 == 
walKey.getFollowingKvCount()) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("WALKey has no KVs that follow it; trying the next one. 
current offset=" + this.inputStream.getPos());
-          }
-          seekOnFs(originalPosition);
-          return false;
-        }
-        int expectedCells = walKey.getFollowingKvCount();
-        long posBefore = this.inputStream.getPos();
+      } catch (Exception ex) {
+        String posAfterStr = "<unknown>";
         try {
-          int actualCells = entry.getEdit().readFromCells(cellDecoder, 
expectedCells);
-          if (expectedCells != actualCells) {
-            resetPosition = true;
-            throw new EOFException("Only read " + actualCells); // other info 
added in catch
-          }
-        } catch (Exception ex) {
-          String posAfterStr = "<unknown>";
-          try {
-            posAfterStr = this.inputStream.getPos() + "";
-          } catch (Throwable t) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Error getting pos for error message - ignoring", t);
-            }
+          posAfterStr = this.inputStream.getPos() + "";
+        } catch (Throwable t) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Error getting pos for error message - ignoring", t);
           }
-          String message = " while reading " + expectedCells + " WAL KVs; 
started reading at "
-              + posBefore + " and read up to " + posAfterStr;
-          IOException realEofEx = extractHiddenEof(ex);
-          throw (EOFException) new EOFException("EOF " + message).
-              initCause(realEofEx != null ? realEofEx : ex);
         }
-        if (trailerPresent && this.inputStream.getPos() > 
this.walEditsStopOffset) {
-          LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
-              + ", inputStream.getPos(): " + this.inputStream.getPos() + ", 
walEditsStopOffset: "
-              + this.walEditsStopOffset);
-          throw new EOFException("Read WALTrailer while reading WALEdits");
+        String message =
+          " while reading " + expectedCells + " WAL KVs; started reading at " 
+ posBefore
+            + " and read up to " + posAfterStr;
+        IOException realEofEx = extractHiddenEof(ex);
+        throw (EOFException) new EOFException("EOF " + message).
+          initCause(realEofEx != null ? realEofEx : ex);
+      }
+      if (trailerPresent && this.inputStream.getPos() > 
this.walEditsStopOffset) {
+        LOG.error(
+          "Read WALTrailer while reading WALEdits. wal: " + this.path + ", 
inputStream.getPos(): "
+            + this.inputStream.getPos() + ", walEditsStopOffset: " + 
this.walEditsStopOffset);
+        throw new EOFException("Read WALTrailer while reading WALEdits");
+      }
+    } catch (EOFException eof) {
+      // If originalPosition is < 0, it is rubbish and we cannot use it 
(probably local fs)
+      if (originalPosition < 0) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Encountered a malformed edit, but can't seek back to last 
good position "
+              + "because originalPosition is negative. last offset=" + 
this.inputStream.getPos(),
+            eof);
         }
-      } catch (EOFException eof) {
-        // If originalPosition is < 0, it is rubbish and we cannot use it 
(probably local fs)
-        if (originalPosition < 0) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Encountered a malformed edit, but can't seek back to 
last good position "
-                + "because originalPosition is negative. last offset="
-                + this.inputStream.getPos(), eof);
-          }
-          throw eof;
+        throw eof;
+      }
+      // If stuck at the same place and we got and exception, lets go back at 
the beginning.
+      if (inputStream.getPos() == originalPosition && resetPosition) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Encountered a malformed edit, seeking to the beginning of 
the WAL since "
+            + "current position and original position match at " + 
originalPosition);
         }
-        // If stuck at the same place and we got and exception, lets go back 
at the beginning.
-        if (inputStream.getPos() == originalPosition && resetPosition) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Encountered a malformed edit, seeking to the beginning 
of the WAL since "
-                + "current position and original position match at " + 
originalPosition);
-          }
-          seekOnFs(0);
-        } else {
-          // Else restore our position to original location in hope that next 
time through we will
-          // read successfully.
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Encountered a malformed edit, seeking back to last good 
position in file, "
-                + "from " + inputStream.getPos()+" to " + originalPosition, 
eof);
-          }
-          seekOnFs(originalPosition);
+        seekOnFs(0);
+      } else {
+        // Else restore our position to original location in hope that next 
time through we will
+        // read successfully.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(
+            "Encountered a malformed edit, seeking back to last good position 
in file, " + "from "
+              + inputStream.getPos() + " to " + originalPosition, eof);
         }
-        return false;
+        seekOnFs(originalPosition);
       }
-      return true;
+      return false;
     }
+    return true;
   }
 
   private IOException extractHiddenEof(Exception ex) {

Reply via email to