prashantwason commented on code in PR #5341:
URL: https://github.com/apache/hudi/pull/5341#discussion_r869718009
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -245,97 +286,53 @@ protected synchronized void scanInternal(Option<KeySpec>
keySpecOpt) {
continue;
}
}
+ if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+ LOG.info("Found a corrupt block in " + logFile.getPath());
+ totalCorruptBlocks.incrementAndGet();
+ continue;
+ }
+
+ // Rollback blocks contain information of instants that are failed,
collect them in a set..
Review Comment:
This comments seems more relevant to where the rollback block is being
handled later.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -245,97 +286,53 @@ protected synchronized void scanInternal(Option<KeySpec>
keySpecOpt) {
continue;
}
}
+ if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
Review Comment:
Can we handle this too in the switch that follows? Having a common way to
handle the various block types is easier to understand as per code flow.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -218,7 +221,45 @@ protected synchronized void scanInternal(Option<KeySpec>
keySpecOpt) {
logFilePaths.stream().map(logFile -> new HoodieLogFile(new
Path(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize,
enableRecordLookups, keyField, internalSchema);
Review Comment:
Lets also remove the reverseReader as it is no longer supported.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -218,7 +221,45 @@ protected synchronized void scanInternal(Option<KeySpec>
keySpecOpt) {
logFilePaths.stream().map(logFile -> new HoodieLogFile(new
Path(logFile))).collect(Collectors.toList()),
readerSchema, readBlocksLazily, reverseReader, bufferSize,
enableRecordLookups, keyField, internalSchema);
Review Comment:
Lets also remove the readBlocksLazily argument as it now required to be
always true.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -245,97 +286,53 @@ protected synchronized void scanInternal(Option<KeySpec>
keySpecOpt) {
continue;
}
}
+ if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+ LOG.info("Found a corrupt block in " + logFile.getPath());
+ totalCorruptBlocks.incrementAndGet();
+ continue;
+ }
+
+ // Rollback blocks contain information of instants that are failed,
collect them in a set..
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
- LOG.info("Reading a data block from file " + logFile.getPath() + "
at instant "
- + logBlock.getLogBlockHeader().get(INSTANT_TIME));
- if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
- // If this is an avro data block belonging to a different
commit/instant,
- // then merge the last blocks and records into the main result
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- }
- // store the current block
- currentInstantLogBlocks.push(logBlock);
- break;
case DELETE_BLOCK:
- LOG.info("Reading a delete block from file " + logFile.getPath());
- if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
- // If this is a delete data block belonging to a different
commit/instant,
- // then merge the last blocks and records into the main result
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- }
- // store deletes so can be rolled back
- currentInstantLogBlocks.push(logBlock);
+ dataAndDeleteBlocks.add(logBlock);
break;
case COMMAND_BLOCK:
- // Consider the following scenario
- // (Time 0, C1, Task T1) -> Running
- // (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block
or a correct
- // DataBlock (B1) with commitTime C1
- // (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the
attempt number is 2)
- // (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock
B2)
- // Now a logFile L1 can have 2 correct Datablocks (B1 and B2)
which are the same.
- // Say, commit C1 eventually failed and a rollback is triggered.
- // Rollback will write only 1 rollback block (R1) since it assumes
one block is
- // written per ingestion batch for a file but in reality we need
to rollback (B1 & B2)
- // The following code ensures the same rollback block (R1) is used
to rollback
- // both B1 & B2
LOG.info("Reading a command block from file " + logFile.getPath());
// This is a command block - take appropriate action based on the
command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
- String targetInstantForCommandBlock =
-
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
- switch (commandBlock.getType()) { // there can be different types
of command blocks
- case ROLLBACK_PREVIOUS_BLOCK:
- // Rollback the last read log block
- // Get commit time from last record block, compare with
targetCommitTime,
- // rollback only if equal, this is required in scenarios of
invalid/extra
- // rollback blocks written due to failures during the rollback
operation itself
- // and ensures the same rollback block (R1) is used to
rollback both B1 & B2 with
- // same instant_time
- int numBlocksRolledBack = 0;
- totalRollbacks.incrementAndGet();
- while (!currentInstantLogBlocks.isEmpty()) {
- HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
- // handle corrupt blocks separately since they may not have
metadata
- if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
- LOG.info("Rolling back the last corrupted log block read
in " + logFile.getPath());
- currentInstantLogBlocks.pop();
- numBlocksRolledBack++;
- } else if
(targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME)))
{
- // rollback last data block or delete block
- LOG.info("Rolling back the last log block read in " +
logFile.getPath());
- currentInstantLogBlocks.pop();
- numBlocksRolledBack++;
- } else if (!targetInstantForCommandBlock
-
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME)))
{
- // invalid or extra rollback block
- LOG.warn("TargetInstantTime " +
targetInstantForCommandBlock
- + " invalid or extra rollback command block in " +
logFile.getPath());
- break;
- } else {
- // this should not happen ideally
- LOG.warn("Unable to apply rollback command block in " +
logFile.getPath());
- }
- }
- LOG.info("Number of applied rollback blocks " +
numBlocksRolledBack);
- break;
- default:
- throw new UnsupportedOperationException("Command type not yet
supported.");
+ if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+ totalRollbacks.incrementAndGet();
+ String targetInstantForCommandBlock =
+ logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+ targetRollbackInstants.add(targetInstantForCommandBlock);
+ } else {
+ throw new UnsupportedOperationException("Command type not yet
supported.");
}
break;
- case CORRUPT_BLOCK:
- LOG.info("Found a corrupt block in " + logFile.getPath());
- totalCorruptBlocks.incrementAndGet();
- // If there is a corrupt block - we will assume that this was the
next data block
- currentInstantLogBlocks.push(logBlock);
- break;
default:
- throw new UnsupportedOperationException("Block type not supported
yet");
+ throw new UnsupportedOperationException("Block type not yet
supported.");
}
}
+
+ int numBlocksRolledBack = 0;
+ // This is a reverse traversal on the collected data blocks.
Review Comment:
collected data and delete blocks.
How is this reverse traversal? Isnt the for-loop a forward traversal?
##########
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -839,20 +839,24 @@ public void
testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di
writer.appendBlock(dataBlock);
// Write 2
+ header = new HashMap<>();
Review Comment:
header.clear() also works instead of allocating a new hashmap each time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]