This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e636d82ee0d [HUDI-6356] Fix rollback logic on
AbstractHoodieLogRecordReader when roll backed blocks are created with same
instant (#7729)
e636d82ee0d is described below
commit e636d82ee0d742a5edb0529e08c7eda942c5d7c5
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Jun 13 09:02:33 2023 -0700
[HUDI-6356] Fix rollback logic on AbstractHoodieLogRecordReader when roll
backed blocks are created with same instant (#7729)
Enhance rollback logic in AbstractHoodieLogRecordReader
Summary:
On metadata table deltacommmit timestamp is equivalent to main table
commit's timestamp. So, if metadata sync fails it reuses the same timestamp.
This change fixes the case where the log blocks are treated as valid if
their corresponding rollback is already visited.
---
.../table/log/AbstractHoodieLogRecordReader.java | 9 +-
.../common/functional/TestHoodieLogFormat.java | 120 ++++++++++++++++++++-
2 files changed, 121 insertions(+), 8 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 865be1bb808..7304cc3d1fb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -499,6 +499,8 @@ public abstract class AbstractHoodieLogRecordReader {
String targetInstantForCommandBlock =
logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
targetRollbackInstants.add(targetInstantForCommandBlock);
+ orderedInstantsList.remove(targetInstantForCommandBlock);
+ instantToBlocksMap.remove(targetInstantForCommandBlock);
} else {
throw new UnsupportedOperationException("Command type not yet
supported.");
}
@@ -527,13 +529,6 @@ public abstract class AbstractHoodieLogRecordReader {
*/
for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
String instantTime = orderedInstantsList.get(i);
-
- // Exclude the blocks which are included in targetRollbackInstants set.
- // Here, rollback can include instants affiliated to deltacommits or
log compaction commits.
- if (targetRollbackInstants.contains(instantTime)) {
- numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
- continue;
- }
List<HoodieLogBlock> instantsBlocks =
instantToBlocksMap.get(instantTime);
if (instantsBlocks.size() == 0) {
throw new HoodieException("Data corrupted while writing. Found zero
blocks for an instant " + instantTime);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 9920ff2d5ed..af4e5d72d85 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -148,9 +150,17 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
basePath = new Path(workDir.toString(), testInfo.getDisplayName() +
System.currentTimeMillis()).toString();
partitionPath = new Path(basePath, "partition_path");
spillableBasePath = new Path(workDir.toString(),
".spillable_path").toString();
+ assertTrue(fs.mkdirs(partitionPath));
HoodieTestUtils.init(fs.getConf(), basePath,
HoodieTableType.MERGE_ON_READ);
}
+ @AfterEach
+ public void tearDown() throws IOException {
+ fs.delete(new Path(basePath), true);
+ fs.delete(partitionPath, true);
+ fs.delete(new Path(spillableBasePath), true);
+ }
+
@Test
public void testEmptyLog() throws IOException {
Writer writer =
@@ -1339,9 +1349,9 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
- assertEquals(100, readKeys.size(), "Stream collect should return all 200
records after rollback of delete");
final List<Boolean> newEmptyPayloads = new ArrayList<>();
scanner.forEach(s -> {
try {
@@ -1364,6 +1374,114 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
scanner.close();
}
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void
testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean
isCompressionEnabled,
+ boolean
readBlocksLazily,
+ boolean
enableOptimizedLogBlocksScan)
+ throws IOException, URISyntaxException, InterruptedException {
+ Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+ // Set a small threshold so that every block is a new version
+ String fileId = "test-fileid111";
+ Writer writer =
+
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withFileId(fileId).overBaseCommit("100").withFs(fs).build();
+
+ // Write 1 -> 100 records are written
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> copyOfRecords1 = records1.stream()
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+ HoodieDataBlock dataBlock =
getDataBlock(HoodieLogBlockType.AVRO_DATA_BLOCK, records1, header);
+ writer.appendBlock(dataBlock);
+
+ // Write 2 -> 100 records are written
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+ List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> allRecordsInserted = records2.stream()
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ dataBlock = getDataBlock(HoodieLogBlockType.AVRO_DATA_BLOCK, records2,
header);
+ writer.appendBlock(dataBlock);
+ allRecordsInserted.addAll(copyOfRecords1);
+
+ // Delete 50 keys from write 1 batch
+ List<HoodieKey> deletedKeys = copyOfRecords1.stream()
+ .map(s -> (new HoodieKey(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+ ((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+ .collect(Collectors.toList()).subList(0, 50);
+
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
+ HoodieDeleteBlock deleteBlock = new
HoodieDeleteBlock(deletedKeys.stream().map(deletedKey ->
+ DeleteRecord.create(deletedKey.getRecordKey(),
deletedKey.getPartitionPath()))
+ .collect(Collectors.toList()).toArray(new DeleteRecord[0]), header);
+ writer.appendBlock(deleteBlock);
+
+ List<String> allLogFiles =
+ FSUtils.getAllLogFiles(fs, partitionPath, fileId,
HoodieLogFile.DELTA_EXTENSION, "100")
+ .map(s -> s.getPath().toString()).collect(Collectors.toList());
+
+ // Rollback the last block i.e. a data block.
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
+ header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "102");
+ header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
+ HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
+ writer.appendBlock(commandBlock);
+
+ // Recreate the delete block which should have been removed from
consideration because of rollback block next to it.
+ Map<HoodieLogBlock.HeaderMetadataType, String> deleteBlockHeader = new
HashMap<>();
+ deleteBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
"102");
+ deleteBlock = new HoodieDeleteBlock(
+ deletedKeys.stream().map(deletedKey ->
+ DeleteRecord.create(deletedKey.getRecordKey(),
deletedKey.getPartitionPath()))
+ .collect(Collectors.toList()).toArray(new DeleteRecord[0]),
deleteBlockHeader);
+ writer.appendBlock(deleteBlock);
+
+ FileCreateUtils.createDeltaCommit(basePath, "102", fs);
+
+ final List<String> readKeys = new ArrayList<>();
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("103")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(BUFFER_SIZE)
+ .withSpillableMapBasePath(spillableBasePath)
+ .withDiskMapType(diskMapType)
+ .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
+
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
+ .build();
+ scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
+ final List<Boolean> newEmptyPayloads = new ArrayList<>();
+ scanner.forEach(s -> {
+ try {
+ if (!((HoodieRecordPayload)
s.getData()).getInsertValue(schema).isPresent()) {
+ newEmptyPayloads.add(true);
+ }
+ } catch (IOException io) {
+ throw new UncheckedIOException(io);
+ }
+ });
+ assertEquals(200, readKeys.size(), "Stream collect should return all 200
records");
+ assertEquals(50, newEmptyPayloads.size(), "Stream collect should return 50
records with empty payloads.");
+ List<String> recordKeysInserted = allRecordsInserted.stream().map(s ->
((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+ .collect(Collectors.toList());
+ Collections.sort(recordKeysInserted);
+ Collections.sort(readKeys);
+ assertEquals(recordKeysInserted, readKeys, "CompositeAvroLogReader should
return 150 records from 2 versions");
+ writer.close();
+ scanner.close();
+ }
+
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType
diskMapType,