This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9966b2c556 [HUDI-2780] Fix the issue of Mor log skipping complete 
blocks when reading data (#4015)
9966b2c556 is described below

commit 9966b2c5560db6cc844ba5108d536f6fc2f43ad8
Author: hj2016 <[email protected]>
AuthorDate: Wed Sep 28 23:02:59 2022 +0800

    [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading 
data (#4015)
    
    
    Co-authored-by: huangjing02 <[email protected]>
    Co-authored-by: sivabalan <[email protected]>
---
 .../hudi/common/table/log/HoodieLogFileReader.java | 15 ++--
 .../common/functional/TestHoodieLogFormat.java     | 79 ++++++++++++++--------
 2 files changed, 57 insertions(+), 37 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index c784684cc0..2e2af79823 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -150,13 +150,14 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   // for max of Integer size
   private HoodieLogBlock readBlock() throws IOException {
     int blockSize;
+    long blockStartPos = inputStream.getPos();
     try {
       // 1 Read the total size of the block
       blockSize = (int) inputStream.readLong();
     } catch (EOFException | CorruptedLogFileException e) {
       // An exception reading any of the above indicates a corrupt block
       // Create a corrupt block by finding the next MAGIC marker or EOF
-      return createCorruptBlock();
+      return createCorruptBlock(blockStartPos);
     }
 
     // We may have had a crash which could have written this block partially
@@ -164,7 +165,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     // block) or EOF. If we did not find either of it, then this block is a 
corrupted block.
     boolean isCorrupted = isBlockCorrupted(blockSize);
     if (isCorrupted) {
-      return createCorruptBlock();
+      return createCorruptBlock(blockStartPos);
     }
 
     // 2. Read the version for this log format
@@ -253,14 +254,14 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     return HoodieLogBlockType.values()[type];
   }
 
-  private HoodieLogBlock createCorruptBlock() throws IOException {
-    LOG.info("Log " + logFile + " has a corrupted block at " + 
inputStream.getPos());
-    long currentPos = inputStream.getPos();
+  private HoodieLogBlock createCorruptBlock(long blockStartPos) throws 
IOException {
+    LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos);
+    inputStream.seek(blockStartPos);
     long nextBlockOffset = scanForNextAvailableBlockOffset();
     // Rewind to the initial start and read corrupted bytes till the 
nextBlockOffset
-    inputStream.seek(currentPos);
+    inputStream.seek(blockStartPos);
     LOG.info("Next available block in " + logFile + " starts at " + 
nextBlockOffset);
-    int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
+    int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
     long contentPosition = inputStream.getPos();
     Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, 
corruptedBlockSize, readBlockLazily);
     HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 6d084f8e17..f87e5a41b8 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -700,20 +700,11 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
   @Test
   public void testAppendAndReadOnCorruptedLog() throws IOException, 
URISyntaxException, InterruptedException {
-    Writer writer =
-        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
-    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
-    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
-    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, 
header);
-    writer.appendBlock(dataBlock);
-    writer.close();
+    HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
 
     // Append some arbit byte[] to the end of the log (mimics a partially 
written commit)
     fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
-    FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
+    FSDataOutputStream outputStream = fs.append(logFile.getPath());
     // create a block with
     outputStream.write(HoodieLogFormat.MAGIC);
     // Write out a length that does not confirm with the content
@@ -728,17 +719,10 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     outputStream.close();
 
     // Append a proper block that is of the missing length of the corrupted 
block
-    writer =
-        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
-    records = SchemaTestUtil.generateTestRecords(0, 10);
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
-    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
-    writer.appendBlock(dataBlock);
-    writer.close();
+    logFile = addValidBlock("test-fileId1", "100", 10);
 
     // First round of reads - we should be able to read the first block and 
then EOF
-    Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema());
+    Reader reader = HoodieLogFormat.newReader(fs, logFile, 
SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -751,7 +735,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     reader.close();
 
     // Simulate another failure back to back
-    outputStream = fs.append(writer.getLogFile().getPath());
+    outputStream = fs.append(logFile.getPath());
     // create a block with
     outputStream.write(HoodieLogFormat.MAGIC);
     // Write out a length that does not confirm with the content
@@ -766,17 +750,10 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     outputStream.close();
 
     // Should be able to append a new block
-    writer =
-        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
-    records = SchemaTestUtil.generateTestRecords(0, 100);
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
-    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
-    writer.appendBlock(dataBlock);
-    writer.close();
+    logFile = addValidBlock("test-fileId1", "100", 100);
 
     // Second round of reads - we should be able to read the first and last 
block
-    reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema());
+    reader = HoodieLogFormat.newReader(fs, logFile, 
SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
@@ -792,6 +769,48 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     reader.close();
   }
 
+  @Test
+  public void testMissingBlockExceptMagicBytes() throws IOException, 
URISyntaxException, InterruptedException {
+    HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
+
+    // Append just magic bytes and move onto next block
+    fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
+    FSDataOutputStream outputStream = fs.append(logFile.getPath());
+    outputStream.write(HoodieLogFormat.MAGIC);
+    outputStream.flush();
+    outputStream.close();
+
+    // Append a proper block
+    logFile = addValidBlock("test-fileId1", "100", 10);
+
+    // First round of reads - we should be able to read the first block and 
then EOF
+    Reader reader = HoodieLogFormat.newReader(fs, logFile, 
SchemaTestUtil.getSimpleSchema());
+    assertTrue(reader.hasNext(), "First block should be available");
+    reader.next();
+    assertTrue(reader.hasNext(), "We should have corrupted block next");
+    HoodieLogBlock block = reader.next();
+    assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The 
read block should be a corrupt block");
+    assertTrue(reader.hasNext(), "Third block should be available");
+    reader.next();
+    assertFalse(reader.hasNext(), "There should be no more block left");
+
+    reader.close();
+  }
+
+  private HoodieLogFile addValidBlock(String fileId, String commitTime, int 
numRecords) throws IOException, URISyntaxException, InterruptedException {
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withFileId(fileId).overBaseCommit(commitTime).withFs(fs).build();
+    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 
numRecords);
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, 
header);
+    writer.appendBlock(dataBlock);
+    writer.close();
+    return writer.getLogFile();
+  }
+
   @Test
   public void testValidateCorruptBlockEndPosition() throws IOException, 
URISyntaxException, InterruptedException {
     Writer writer =

Reply via email to