prasannarajaperumal commented on code in PR #5958:
URL: https://github.com/apache/hudi/pull/5958#discussion_r946528789


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -138,6 +144,20 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .sinceVersion("0.11.0")
       .withDocumentation("When rewriting data, preserves existing 
hoodie_commit_time");
 
+  public static final ConfigProperty<String> PARTITIONS_FOR_COMPACTION = 
ConfigProperty
+      .key("hoodie.compaction.target.partitions")
+      .defaultValue("")
+      .withDocumentation("Used by 
org.apache.hudi.table.action.compact.strategy.SpecificPartitionCompactionStrategy
 "
+          + "to filter the required partitions to compact. This takes a string 
value with partitions separated by comma. "
+          + "Empty value implies no filtering so all the partitions are 
selected.");
+
+  public static final ConfigProperty<String> PARTITIONS_FOR_LOG_COMPACTION = 
ConfigProperty

Review Comment:
   Same as above. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -170,6 +180,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, 
String basePath, List<Str
     this.forceFullScan = forceFullScan;
     this.internalSchema = internalSchema == null ? 
InternalSchema.getEmptyInternalSchema() : internalSchema;
     this.path = basePath;
+    this.useScanV2 = useScanV2;

Review Comment:
   We need a better way to version the LogRecordReader. Can we pass in a 
version number for now? We can evolve this later. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java:
##########
@@ -36,7 +36,7 @@ public class HoodieCommandBlock extends HoodieLogBlock {
    * Hoodie command block type enum.
    */
   public enum HoodieCommandBlockTypeEnum {
-    ROLLBACK_PREVIOUS_BLOCK
+    ROLLBACK_BLOCK

Review Comment:
   Is this backwards compatible? Making sure we dont persist this by name



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and 
compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the 
rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example 
scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not 
the previous block.
+       * This becomes more complicated if we have compacted blocks, which are 
data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks 
that are not corrupted
+       * along with the block instant times and rollback block's target 
instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse 
order.
+       * While iterating in reverse order keep a track of final compacted 
instant times for each block.
+       * In doing so, when a data block is seen include the final compacted 
block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called 
M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final 
block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the 
compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be 
able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which 
blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log 
blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the 
log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {

Review Comment:
   Should we introduce a DeltaTimeline within a file slice to abstract 
constructing the list of valid log blocks to read/merge with the base file? 
This logic is essentially building the timeline dealing with out of order 
corrupt and rollbacks. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -314,6 +334,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("When enable, hoodie will auto merge several small 
archive files into larger one. It's"
           + " useful when storage scheme doesn't support append operation.");
 
+  public static final ConfigProperty<String> LOG_COMPACTION_BLOCKS_THRESHOLD = 
ConfigProperty
+      .key("hoodie.log.compaction.blocks.threshold")
+      .defaultValue("5")

Review Comment:
   Can you file a jira to track this? I think this is critical to avoid 
rewriting log files over and over again. For a Streaming heavy workload this 
can be quite costly I suppose. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to