This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9d46ce3 [HUDI -409] Match header and footer block length to improve
corrupted block detection (#1332)
9d46ce3 is described below
commit 9d46ce380a3929605b3838238e8aa07a9918ab7a
Author: Ramachandran M S <[email protected]>
AuthorDate: Tue Mar 3 13:26:54 2020 -0800
[HUDI -409] Match header and footer block length to improve corrupted block
detection (#1332)
---
.../hudi/common/table/log/HoodieLogFileReader.java | 16 +++++++++++++++-
.../common/table/log/HoodieLogFormatWriter.java | 3 +++
.../hudi/common/table/log/TestHoodieLogFormat.java | 21 ++++++++++++++++++---
3 files changed, 36 insertions(+), 4 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 40a5243..53627e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -231,6 +231,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader
{
inputStream.seek(currentPos + blocksize);
}
} catch (EOFException e) {
+ LOG.info("Found corrupted block in file " + logFile + " with block
size(" + blocksize + ") running past EOF");
// this is corrupt
// This seek is required because contract of seek() is different for
naked DFSInputStream vs BufferedFSInputStream
// release-3.1.0-RC1/DFSInputStream.java#L1455
@@ -239,12 +240,26 @@ class HoodieLogFileReader implements
HoodieLogFormat.Reader {
return true;
}
+ // check if the blocksize mentioned in the footer is the same as the
header; by seeking back the length of a long
+ // the backward seek does not incur additional IO as {@link
org.apache.hadoop.hdfs.DFSInputStream#seek()}
+ // only moves the index. actual IO happens on the next read operation
+ inputStream.seek(inputStream.getPos() - Long.BYTES);
+ // Block size in the footer includes the magic header, which the header
does not include.
+ // So we have to shorten the footer block size by the size of magic hash
+ long blockSizeFromFooter = inputStream.readLong() - MAGIC_BUFFER.length;
+ if (blocksize != blockSizeFromFooter) {
+ LOG.info("Found corrupted block in file " + logFile + ". Header block
size(" + blocksize
+ + ") did not match the footer block size(" + blockSizeFromFooter
+ ")");
+ inputStream.seek(currentPos);
+ return true;
+ }
try {
readMagic();
// all good - either we found the sync marker or EOF. Reset position and
continue
return false;
} catch (CorruptedLogFileException e) {
// This is a corrupted block
+ LOG.info("Found corrupted block in file " + logFile + ". No magic hash
found right after footer block size entry");
return true;
} finally {
inputStream.seek(currentPos);
@@ -310,7 +325,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader
{
}
private boolean hasNextMagic() throws IOException {
- long pos = inputStream.getPos();
// 1. Read magic header from the start of the block
inputStream.readFully(MAGIC_BUFFER, 0, 6);
return Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 201b879..1b2e188 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -158,6 +158,9 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
this.output.write(footerBytes);
// 9. Write the total size of the log block (including magic) which is
everything written
// until now (for reverse pointer)
+ // Update: this information is now used in determining if a block is
corrupt by comparing to the
+ // block size in header. This change assumes that the block size will be
the last data written
+ // to a block. Read will break if any data is written past this point
for a block.
this.output.writeLong(this.output.size() - currentSize);
// Flush every block to disk
flush();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
index b896d1f..17d04da 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
@@ -491,16 +491,27 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
// Write out a length that does not confirm with the content
- outputStream.writeLong(1000);
+ outputStream.writeLong(474);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
// Write out a length that does not confirm with the content
- outputStream.writeLong(500);
- // Write out some bytes
+ outputStream.writeLong(400);
+ // Write out incomplete content
outputStream.write("something-random".getBytes());
outputStream.flush();
outputStream.close();
+ // Append a proper block that is of the missing length of the corrupted
block
+ writer =
+
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+ records = SchemaTestUtil.generateTestRecords(0, 10);
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
+ dataBlock = new HoodieAvroDataBlock(records, header);
+ writer = writer.appendBlock(dataBlock);
+ writer.close();
+
+
// First round of reads - we should be able to read the first block and
then EOF
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(),
SchemaTestUtil.getSimpleSchema());
assertTrue("First block should be available", reader.hasNext());
@@ -508,6 +519,8 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
assertTrue("We should have corrupted block next", reader.hasNext());
HoodieLogBlock block = reader.next();
assertEquals("The read block should be a corrupt block",
HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());
+ assertTrue("Third block should be available", reader.hasNext());
+ reader.next();
assertFalse("There should be no more block left", reader.hasNext());
reader.close();
@@ -543,6 +556,8 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
reader.next();
assertTrue("We should get the 1st corrupted block next", reader.hasNext());
reader.next();
+ assertTrue("Third block should be available", reader.hasNext());
+ reader.next();
assertTrue("We should get the 2nd corrupted block next", reader.hasNext());
block = reader.next();
assertEquals("The read block should be a corrupt block",
HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());