prashantwason commented on code in PR #5341:
URL: https://github.com/apache/hudi/pull/5341#discussion_r869718009


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -245,97 +286,53 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
             continue;
           }
         }
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+
+        // Rollback blocks contain information of instants that are failed, 
collect them in a set..

Review Comment:
   This comments seems more relevant to where the rollback block is being 
handled later. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -245,97 +286,53 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
             continue;
           }
         }
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {

Review Comment:
   Can we handle this too in the switch that follows? Having a common way to 
handle the various block types is easier to understand as per code flow.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -218,7 +221,45 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
           readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
 

Review Comment:
   Lets also remove the reverseReader as it is no longer supported.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -218,7 +221,45 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
           readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);

Review Comment:
   Lets also remove the readBlocksLazily argument as it now required to be 
always true. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -245,97 +286,53 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
             continue;
           }
         }
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+
+        // Rollback blocks contain information of instants that are failed, 
collect them in a set..
         switch (logBlock.getBlockType()) {
           case HFILE_DATA_BLOCK:
           case AVRO_DATA_BLOCK:
           case PARQUET_DATA_BLOCK:
-            LOG.info("Reading a data block from file " + logFile.getPath() + " 
at instant "
-                + logBlock.getLogBlockHeader().get(INSTANT_TIME));
-            if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
-              // If this is an avro data block belonging to a different 
commit/instant,
-              // then merge the last blocks and records into the main result
-              processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-            }
-            // store the current block
-            currentInstantLogBlocks.push(logBlock);
-            break;
           case DELETE_BLOCK:
-            LOG.info("Reading a delete block from file " + logFile.getPath());
-            if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
-              // If this is a delete data block belonging to a different 
commit/instant,
-              // then merge the last blocks and records into the main result
-              processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keySpecOpt);
-            }
-            // store deletes so can be rolled back
-            currentInstantLogBlocks.push(logBlock);
+            dataAndDeleteBlocks.add(logBlock);
             break;
           case COMMAND_BLOCK:
-            // Consider the following scenario
-            // (Time 0, C1, Task T1) -> Running
-            // (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block 
or a correct
-            // DataBlock (B1) with commitTime C1
-            // (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the 
attempt number is 2)
-            // (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock 
B2)
-            // Now a logFile L1 can have 2 correct Datablocks (B1 and B2) 
which are the same.
-            // Say, commit C1 eventually failed and a rollback is triggered.
-            // Rollback will write only 1 rollback block (R1) since it assumes 
one block is
-            // written per ingestion batch for a file but in reality we need 
to rollback (B1 & B2)
-            // The following code ensures the same rollback block (R1) is used 
to rollback
-            // both B1 & B2
             LOG.info("Reading a command block from file " + logFile.getPath());
             // This is a command block - take appropriate action based on the 
command
             HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
-            String targetInstantForCommandBlock =
-                
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
-            switch (commandBlock.getType()) { // there can be different types 
of command blocks
-              case ROLLBACK_PREVIOUS_BLOCK:
-                // Rollback the last read log block
-                // Get commit time from last record block, compare with 
targetCommitTime,
-                // rollback only if equal, this is required in scenarios of 
invalid/extra
-                // rollback blocks written due to failures during the rollback 
operation itself
-                // and ensures the same rollback block (R1) is used to 
rollback both B1 & B2 with
-                // same instant_time
-                int numBlocksRolledBack = 0;
-                totalRollbacks.incrementAndGet();
-                while (!currentInstantLogBlocks.isEmpty()) {
-                  HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
-                  // handle corrupt blocks separately since they may not have 
metadata
-                  if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.info("Rolling back the last corrupted log block read 
in " + logFile.getPath());
-                    currentInstantLogBlocks.pop();
-                    numBlocksRolledBack++;
-                  } else if 
(targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME)))
 {
-                    // rollback last data block or delete block
-                    LOG.info("Rolling back the last log block read in " + 
logFile.getPath());
-                    currentInstantLogBlocks.pop();
-                    numBlocksRolledBack++;
-                  } else if (!targetInstantForCommandBlock
-                      
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME)))
 {
-                    // invalid or extra rollback block
-                    LOG.warn("TargetInstantTime " + 
targetInstantForCommandBlock
-                        + " invalid or extra rollback command block in " + 
logFile.getPath());
-                    break;
-                  } else {
-                    // this should not happen ideally
-                    LOG.warn("Unable to apply rollback command block in " + 
logFile.getPath());
-                  }
-                }
-                LOG.info("Number of applied rollback blocks " + 
numBlocksRolledBack);
-                break;
-              default:
-                throw new UnsupportedOperationException("Command type not yet 
supported.");
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet 
supported.");
             }
             break;
-          case CORRUPT_BLOCK:
-            LOG.info("Found a corrupt block in " + logFile.getPath());
-            totalCorruptBlocks.incrementAndGet();
-            // If there is a corrupt block - we will assume that this was the 
next data block
-            currentInstantLogBlocks.push(logBlock);
-            break;
           default:
-            throw new UnsupportedOperationException("Block type not supported 
yet");
+            throw new UnsupportedOperationException("Block type not yet 
supported.");
         }
       }
+
+      int numBlocksRolledBack = 0;
+      // This is a reverse traversal on the collected data blocks.

Review Comment:
   collected data and delete blocks.
   
   How is this reverse traversal? Isnt the for-loop a forward traversal?



##########
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -839,20 +839,24 @@ public void 
testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di
     writer.appendBlock(dataBlock);
 
     // Write 2
+    header = new HashMap<>();

Review Comment:
   header.clear() also works instead of allocating a new hashmap each time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to