This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-0.12.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e2a9d3b790c9192bf03b29a579b6fc4c30d5e8da Author: hj2016 <[email protected]> AuthorDate: Wed Sep 28 23:02:59 2022 +0800 [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015) Co-authored-by: huangjing02 <[email protected]> Co-authored-by: sivabalan <[email protected]> --- .../hudi/common/table/log/HoodieLogFileReader.java | 15 ++-- .../common/functional/TestHoodieLogFormat.java | 79 ++++++++++++++-------- 2 files changed, 57 insertions(+), 37 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index cb16c8b141..11d9e75f4b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -149,13 +149,14 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { // for max of Integer size private HoodieLogBlock readBlock() throws IOException { int blockSize; + long blockStartPos = inputStream.getPos(); try { // 1 Read the total size of the block blockSize = (int) inputStream.readLong(); } catch (EOFException | CorruptedLogFileException e) { // An exception reading any of the above indicates a corrupt block // Create a corrupt block by finding the next MAGIC marker or EOF - return createCorruptBlock(); + return createCorruptBlock(blockStartPos); } // We may have had a crash which could have written this block partially @@ -163,7 +164,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { // block) or EOF. If we did not find either of it, then this block is a corrupted block. boolean isCorrupted = isBlockCorrupted(blockSize); if (isCorrupted) { - return createCorruptBlock(); + return createCorruptBlock(blockStartPos); } // 2. Read the version for this log format @@ -249,14 +250,14 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { return HoodieLogBlockType.values()[type]; } - private HoodieLogBlock createCorruptBlock() throws IOException { - LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos()); - long currentPos = inputStream.getPos(); + private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException { + LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos); + inputStream.seek(blockStartPos); long nextBlockOffset = scanForNextAvailableBlockOffset(); // Rewind to the initial start and read corrupted bytes till the nextBlockOffset - inputStream.seek(currentPos); + inputStream.seek(blockStartPos); LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset); - int corruptedBlockSize = (int) (nextBlockOffset - currentPos); + int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos); long contentPosition = inputStream.getPos(); Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily); HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index c08c2f7029..1d15822ff1 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -604,20 +604,11 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @Test public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException { - Writer writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100); - Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); - HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); - writer.appendBlock(dataBlock); - writer.close(); + HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100); // Append some arbit byte[] to the end of the log (mimics a partially written commit) fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); - FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); + FSDataOutputStream outputStream = fs.append(logFile.getPath()); // create a block with outputStream.write(HoodieLogFormat.MAGIC); // Write out a length that does not confirm with the content @@ -632,17 +623,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { outputStream.close(); // Append a proper block that is of the missing length of the corrupted block - writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - records = SchemaTestUtil.generateTestRecords(0, 10); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); - dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); - writer.appendBlock(dataBlock); - writer.close(); + logFile = addValidBlock("test-fileId1", "100", 10); // First round of reads - we should be able to read the first block and then EOF - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema()); assertTrue(reader.hasNext(), "First block should be available"); reader.next(); assertTrue(reader.hasNext(), "We should have corrupted block next"); @@ -655,7 +639,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { reader.close(); // Simulate another failure back to back - outputStream = fs.append(writer.getLogFile().getPath()); + outputStream = fs.append(logFile.getPath()); // create a block with outputStream.write(HoodieLogFormat.MAGIC); // Write out a length that does not confirm with the content @@ -670,17 +654,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { outputStream.close(); // Should be able to append a new block - writer = - HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); - records = SchemaTestUtil.generateTestRecords(0, 100); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); - dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); - writer.appendBlock(dataBlock); - writer.close(); + logFile = addValidBlock("test-fileId1", "100", 100); // Second round of reads - we should be able to read the first and last block - reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema()); assertTrue(reader.hasNext(), "First block should be available"); reader.next(); assertTrue(reader.hasNext(), "We should get the 1st corrupted block next"); @@ -696,6 +673,48 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { reader.close(); } + @Test + public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException { + HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100); + + // Append just magic bytes and move onto next block + fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); + FSDataOutputStream outputStream = fs.append(logFile.getPath()); + outputStream.write(HoodieLogFormat.MAGIC); + outputStream.flush(); + outputStream.close(); + + // Append a proper block + logFile = addValidBlock("test-fileId1", "100", 10); + + // First round of reads - we should be able to read the first block and then EOF + Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema()); + assertTrue(reader.hasNext(), "First block should be available"); + reader.next(); + assertTrue(reader.hasNext(), "We should have corrupted block next"); + HoodieLogBlock block = reader.next(); + assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block"); + assertTrue(reader.hasNext(), "Third block should be available"); + reader.next(); + assertFalse(reader.hasNext(), "There should be no more block left"); + + reader.close(); + } + + private HoodieLogFile addValidBlock(String fileId, String commitTime, int numRecords) throws IOException, URISyntaxException, InterruptedException { + Writer writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId(fileId).overBaseCommit(commitTime).withFs(fs).build(); + List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numRecords); + Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); + HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header); + writer.appendBlock(dataBlock); + writer.close(); + return writer.getLogFile(); + } + @Test public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException { Writer writer =
