virajjasani commented on code in PR #5059:
URL: https://github.com/apache/hbase/pull/5059#discussion_r1118135225
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java:
##########
@@ -3700,4 +3702,52 @@ public static ClusterStatusProtos.ServerTask
toServerTask(ServerTask task) {
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
}
+ /**
+ * Check whether this IPBE indicates EOF or not.
+ * <p/>
+ * We will check the exception message, if it is likely the one of
+ * InvalidProtocolBufferException.truncatedMessage, we will consider it as
EOF, otherwise not.
+ */
+ public static boolean isEOF(InvalidProtocolBufferException e) {
+ return e.getMessage().contains("input has been truncated");
Review Comment:
I wonder if PB provides some sort of constant for "input has been truncated"
just so that we can use it. I am just worries about PB version compatibility.
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java:
##########
@@ -3700,4 +3702,52 @@ public static ClusterStatusProtos.ServerTask
toServerTask(ServerTask task) {
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
}
+ /**
+ * Check whether this IPBE indicates EOF or not.
+ * <p/>
+ * We will check the exception message, if it is likely the one of
+ * InvalidProtocolBufferException.truncatedMessage, we will consider it as
EOF, otherwise not.
+ */
+ public static boolean isEOF(InvalidProtocolBufferException e) {
+ return e.getMessage().contains("input has been truncated");
+ }
+
+ /**
+ * This is a wrapper of the PB message's parseDelimitedFrom. The difference
is, if we can not
+ * determine whether there are enough bytes in stream, i.e, the available
method does not have a
+ * valid return value, we will try to read all the bytes to a byte array
first, and then parse the
+ * pb message with {@link Parser#parseFrom(byte[])} instead of call
+ * {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because
even if the bytes are
+ * not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could
still return without any
+ * errors but just leave us a partial PB message.
+ * @return The PB message if we can parse it successfully, otherwise there
will always be an
+ * exception thrown, will never return {@code null}.
+ */
+ public static <T extends Message> T parseDelimitedFrom(InputStream in,
Parser<T> parser)
+ throws IOException {
+ int firstByte = in.read();
+ if (firstByte < 0) {
+ throw new EOFException("EOF while reading message size");
+ }
+ int size = CodedInputStream.readRawVarint32(firstByte, in);
+ int available = in.available();
+ if (available > 0) {
+ if (available < size) {
+ throw new EOFException("Available bytes not enough for parsing PB
message, expect at least "
+ + size + " bytes, but only " + available + " bytes available");
+ }
+ // this piece of code is copied from GeneratedMessageV3.parseFrom
+ try {
+ return parser.parseFrom(ByteStreams.limit(in, size));
+ } catch (InvalidProtocolBufferException e) {
+ throw e.unwrapIOException();
Review Comment:
Shall we also log `InvalidProtocolBufferException` before throwing here?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java:
##########
@@ -358,60 +356,46 @@ protected Compression.Algorithm
getValueCompressionAlgorithm() {
@Override
protected boolean readNext(Entry entry) throws IOException {
+ resetCompression = false;
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition ==
this.walEditsStopOffset) {
LOG.trace("Reached end of expected edits area at offset {}",
originalPosition);
return false;
}
- WALKey.Builder builder = WALKey.newBuilder();
- long size = 0;
boolean resetPosition = false;
- // by default, we should reset the compression when seeking back after
reading something
- resetCompression = true;
try {
- long available = -1;
+ WALKey walKey;
try {
- int firstByte = this.inputStream.read();
- if (firstByte == -1) {
- throw new EOFException();
- }
- size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
- // available may be < 0 on local fs for instance. If so, can't depend
on it.
- available = this.inputStream.available();
- if (available > 0 && available < size) {
- // if we quit here, we have just read the length, no actual data
yet, which means we
- // haven't put anything into the compression dictionary yet, so when
seeking back to the
- // last good position, we do not need to reset compression context.
- // This is very useful for saving the extra effort for
reconstructing the compression
- // dictionary, where we need to read from the beginning instead of
just seek to the
- // position, as DFSInputStream implement the available method, so in
most cases we will
- // reach here if there are not enough data.
- resetCompression = false;
- throw new EOFException("Available stream not enough for edit, "
- + "inputStream.available()= " + this.inputStream.available() + ",
" + "entry size= "
- + size + " at offset = " + this.inputStream.getPos());
+ walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
+ } catch (InvalidProtocolBufferException e) {
+ if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
+ // only rethrow EOF if it indicates an EOF, or we have reached the
partial WALTrailer
+ resetPosition = true;
+ throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring;
originalPosition="
+ + originalPosition + ", currentPosition=" +
this.inputStream.getPos()).initCause(e);
+ } else {
+ throw e;
}
- ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream,
size), (int) size);
- } catch (InvalidProtocolBufferException ipbe) {
- resetPosition = true;
- throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring;
originalPosition="
- + originalPosition + ", currentPosition=" +
this.inputStream.getPos() + ", messageSize="
- + size + ", currentAvailable=" + available).initCause(ipbe);
+ } catch (EOFException e) {
+ // append more detailed information
+ throw (EOFException) new EOFException("EOF while reading WAL key;
originalPosition="
+ + originalPosition + ", currentPosition=" +
this.inputStream.getPos()).initCause(e);
}
- if (!builder.isInitialized()) {
- // TODO: not clear if we should try to recover from corrupt PB that
looks semi-legit.
- // If we can get the KV count, we could, theoretically, try to get
next record.
- throw new EOFException("Partial PB while reading WAL, "
- + "probably an unexpected EOF, ignoring. current offset=" +
this.inputStream.getPos());
- }
- WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.debug("WALKey has no KVs that follow it; trying the next one.
current offset={}",
this.inputStream.getPos());
return true;
}
+ // Starting from here, we will start to read cells, which will change
the content in
+ // compression dictionary, so if we fail in the below operations, when
resetting, we also need
+ // to clear the compression context, and read from the beginning to
reconstruct the
+ // compression dictionary, instead of seeking to the position directly.
+ // This is very useful for saving the extra effort for reconstructing
the compression
+ // dictionary, as DFSInputStream implement the available method, so in
most cases we will
+ // not reach here if there are not enough data.
+ resetCompression = true;
Review Comment:
Interesting. So this was introduced by HBASE-27621 and looks like has not
made it to live release yet, correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]