This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e636d82ee0d [HUDI-6356] Fix rollback logic on 
AbstractHoodieLogRecordReader when roll backed blocks are created with same 
instant (#7729)
e636d82ee0d is described below

commit e636d82ee0d742a5edb0529e08c7eda942c5d7c5
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Jun 13 09:02:33 2023 -0700

    [HUDI-6356] Fix rollback logic on AbstractHoodieLogRecordReader when roll 
backed blocks are created with same instant (#7729)
    
     Enhance rollback logic in AbstractHoodieLogRecordReader
    
    Summary:
    On metadata table deltacommmit timestamp is equivalent to main table 
commit's timestamp. So, if metadata sync fails it reuses the same timestamp.
    This change fixes the case where the log blocks are treated as valid if 
their corresponding rollback is already visited.
---
 .../table/log/AbstractHoodieLogRecordReader.java   |   9 +-
 .../common/functional/TestHoodieLogFormat.java     | 120 ++++++++++++++++++++-
 2 files changed, 121 insertions(+), 8 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 865be1bb808..7304cc3d1fb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -499,6 +499,8 @@ public abstract class AbstractHoodieLogRecordReader {
               String targetInstantForCommandBlock =
                   logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
               targetRollbackInstants.add(targetInstantForCommandBlock);
+              orderedInstantsList.remove(targetInstantForCommandBlock);
+              instantToBlocksMap.remove(targetInstantForCommandBlock);
             } else {
               throw new UnsupportedOperationException("Command type not yet 
supported.");
             }
@@ -527,13 +529,6 @@ public abstract class AbstractHoodieLogRecordReader {
        */
       for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
         String instantTime = orderedInstantsList.get(i);
-
-        // Exclude the blocks which are included in targetRollbackInstants set.
-        // Here, rollback can include instants affiliated to deltacommits or 
log compaction commits.
-        if (targetRollbackInstants.contains(instantTime)) {
-          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
-          continue;
-        }
         List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
         if (instantsBlocks.size() == 0) {
           throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 9920ff2d5ed..af4e5d72d85 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -148,9 +150,17 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     basePath = new Path(workDir.toString(), testInfo.getDisplayName() + 
System.currentTimeMillis()).toString();
     partitionPath = new Path(basePath, "partition_path");
     spillableBasePath = new Path(workDir.toString(), 
".spillable_path").toString();
+    assertTrue(fs.mkdirs(partitionPath));
     HoodieTestUtils.init(fs.getConf(), basePath, 
HoodieTableType.MERGE_ON_READ);
   }
 
+  @AfterEach
+  public void tearDown() throws IOException {
+    fs.delete(new Path(basePath), true);
+    fs.delete(partitionPath, true);
+    fs.delete(new Path(spillableBasePath), true);
+  }
+
   @Test
   public void testEmptyLog() throws IOException {
     Writer writer =
@@ -1339,9 +1349,9 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withDiskMapType(diskMapType)
         .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
         
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
+        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
         .build();
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
-    assertEquals(100, readKeys.size(), "Stream collect should return all 200 
records after rollback of delete");
     final List<Boolean> newEmptyPayloads = new ArrayList<>();
     scanner.forEach(s -> {
       try {
@@ -1364,6 +1374,114 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     scanner.close();
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void 
testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType
 diskMapType,
+                                                           boolean 
isCompressionEnabled,
+                                                           boolean 
readBlocksLazily,
+                                                           boolean 
enableOptimizedLogBlocksScan)
+      throws IOException, URISyntaxException, InterruptedException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    // Set a small threshold so that every block is a new version
+    String fileId = "test-fileid111";
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withFileId(fileId).overBaseCommit("100").withFs(fs).build();
+
+    // Write 1 -> 100 records are written
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieDataBlock dataBlock = 
getDataBlock(HoodieLogBlockType.AVRO_DATA_BLOCK, records1, header);
+    writer.appendBlock(dataBlock);
+
+    // Write 2 -> 100 records are written
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+    List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> allRecordsInserted = records2.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    dataBlock = getDataBlock(HoodieLogBlockType.AVRO_DATA_BLOCK, records2, 
header);
+    writer.appendBlock(dataBlock);
+    allRecordsInserted.addAll(copyOfRecords1);
+
+    // Delete 50 keys from write 1 batch
+    List<HoodieKey> deletedKeys = copyOfRecords1.stream()
+        .map(s -> (new HoodieKey(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+        .collect(Collectors.toList()).subList(0, 50);
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
+    HoodieDeleteBlock deleteBlock = new 
HoodieDeleteBlock(deletedKeys.stream().map(deletedKey ->
+        DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()))
+        .collect(Collectors.toList()).toArray(new DeleteRecord[0]), header);
+    writer.appendBlock(deleteBlock);
+
+    List<String> allLogFiles =
+        FSUtils.getAllLogFiles(fs, partitionPath, fileId, 
HoodieLogFile.DELTA_EXTENSION, "100")
+            .map(s -> s.getPath().toString()).collect(Collectors.toList());
+
+    // Rollback the last block i.e. a data block.
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
+    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "102");
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
+    HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
+    writer.appendBlock(commandBlock);
+
+    // Recreate the delete block which should have been removed from 
consideration because of rollback block next to it.
+    Map<HoodieLogBlock.HeaderMetadataType, String> deleteBlockHeader = new 
HashMap<>();
+    deleteBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
"102");
+    deleteBlock = new HoodieDeleteBlock(
+        deletedKeys.stream().map(deletedKey ->
+            DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()))
+            .collect(Collectors.toList()).toArray(new DeleteRecord[0]), 
deleteBlockHeader);
+    writer.appendBlock(deleteBlock);
+
+    FileCreateUtils.createDeltaCommit(basePath, "102", fs);
+
+    final List<String> readKeys = new ArrayList<>();
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("103")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(BUFFER_SIZE)
+        .withSpillableMapBasePath(spillableBasePath)
+        .withDiskMapType(diskMapType)
+        .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+        .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
+        
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
+        .build();
+    scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
+    final List<Boolean> newEmptyPayloads = new ArrayList<>();
+    scanner.forEach(s -> {
+      try {
+        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema).isPresent()) {
+          newEmptyPayloads.add(true);
+        }
+      } catch (IOException io) {
+        throw new UncheckedIOException(io);
+      }
+    });
+    assertEquals(200, readKeys.size(), "Stream collect should return all 200 
records");
+    assertEquals(50, newEmptyPayloads.size(), "Stream collect should return 50 
records with empty payloads.");
+    List<String> recordKeysInserted = allRecordsInserted.stream().map(s -> 
((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+        .collect(Collectors.toList());
+    Collections.sort(recordKeysInserted);
+    Collections.sort(readKeys);
+    assertEquals(recordKeysInserted, readKeys, "CompositeAvroLogReader should 
return 150 records from 2 versions");
+    writer.close();
+    scanner.close();
+  }
+
   @ParameterizedTest
   @MethodSource("testArguments")
   public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,

Reply via email to