Apache9 commented on code in PR #5059:
URL: https://github.com/apache/hbase/pull/5059#discussion_r1118238535
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java:
##########
@@ -358,60 +356,46 @@ protected Compression.Algorithm
getValueCompressionAlgorithm() {
@Override
protected boolean readNext(Entry entry) throws IOException {
+ resetCompression = false;
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition ==
this.walEditsStopOffset) {
LOG.trace("Reached end of expected edits area at offset {}",
originalPosition);
return false;
}
- WALKey.Builder builder = WALKey.newBuilder();
- long size = 0;
boolean resetPosition = false;
- // by default, we should reset the compression when seeking back after
reading something
- resetCompression = true;
try {
- long available = -1;
+ WALKey walKey;
try {
- int firstByte = this.inputStream.read();
- if (firstByte == -1) {
- throw new EOFException();
- }
- size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
- // available may be < 0 on local fs for instance. If so, can't depend
on it.
- available = this.inputStream.available();
- if (available > 0 && available < size) {
- // if we quit here, we have just read the length, no actual data
yet, which means we
- // haven't put anything into the compression dictionary yet, so when
seeking back to the
- // last good position, we do not need to reset compression context.
- // This is very useful for saving the extra effort for
reconstructing the compression
- // dictionary, where we need to read from the beginning instead of
just seek to the
- // position, as DFSInputStream implement the available method, so in
most cases we will
- // reach here if there are not enough data.
- resetCompression = false;
- throw new EOFException("Available stream not enough for edit, "
- + "inputStream.available()= " + this.inputStream.available() + ",
" + "entry size= "
- + size + " at offset = " + this.inputStream.getPos());
+ walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
+ } catch (InvalidProtocolBufferException e) {
+ if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
+ // only rethrow EOF if it indicates an EOF, or we have reached the
partial WALTrailer
+ resetPosition = true;
+ throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring;
originalPosition="
+ + originalPosition + ", currentPosition=" +
this.inputStream.getPos()).initCause(e);
+ } else {
+ throw e;
}
- ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream,
size), (int) size);
- } catch (InvalidProtocolBufferException ipbe) {
- resetPosition = true;
- throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring;
originalPosition="
- + originalPosition + ", currentPosition=" +
this.inputStream.getPos() + ", messageSize="
- + size + ", currentAvailable=" + available).initCause(ipbe);
+ } catch (EOFException e) {
+ // append more detailed information
+ throw (EOFException) new EOFException("EOF while reading WAL key;
originalPosition="
+ + originalPosition + ", currentPosition=" +
this.inputStream.getPos()).initCause(e);
}
- if (!builder.isInitialized()) {
- // TODO: not clear if we should try to recover from corrupt PB that
looks semi-legit.
- // If we can get the KV count, we could, theoretically, try to get
next record.
- throw new EOFException("Partial PB while reading WAL, "
- + "probably an unexpected EOF, ignoring. current offset=" +
this.inputStream.getPos());
- }
- WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.debug("WALKey has no KVs that follow it; trying the next one.
current offset={}",
this.inputStream.getPos());
return true;
}
+ // Starting from here, we will start to read cells, which will change
the content in
+ // compression dictionary, so if we fail in the below operations, when
resetting, we also need
+ // to clear the compression context, and read from the beginning to
reconstruct the
+ // compression dictionary, instead of seeking to the position directly.
+ // This is very useful for saving the extra effort for reconstructing
the compression
+ // dictionary, as DFSInputStream implement the available method, so in
most cases we will
+ // not reach here if there are not enough data.
+ resetCompression = true;
Review Comment:
I just reversed the condition judgement here, no actual logic change. In
HBASE-27621, it defaults to true and we set it to false when parsing WALKey
fails. And when implementing HBASE-27632, I found a better way is to set it
default to false, and when we begin to parse WALEdit, i.e, Cells, we set it to
true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]