prasannarajaperumal commented on code in PR #5958:
URL: https://github.com/apache/hudi/pull/5958#discussion_r946528789
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -138,6 +144,20 @@ public class HoodieCompactionConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("When rewriting data, preserves existing
hoodie_commit_time");
+ public static final ConfigProperty<String> PARTITIONS_FOR_COMPACTION =
ConfigProperty
+ .key("hoodie.compaction.target.partitions")
+ .defaultValue("")
+ .withDocumentation("Used by
org.apache.hudi.table.action.compact.strategy.SpecificPartitionCompactionStrategy
"
+ + "to filter the required partitions to compact. This takes a string
value with partitions separated by comma. "
+ + "Empty value implies no filtering so all the partitions are
selected.");
+
+ public static final ConfigProperty<String> PARTITIONS_FOR_LOG_COMPACTION =
ConfigProperty
Review Comment:
Same as above.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -170,6 +180,7 @@ protected AbstractHoodieLogRecordReader(FileSystem fs,
String basePath, List<Str
this.forceFullScan = forceFullScan;
this.internalSchema = internalSchema == null ?
InternalSchema.getEmptyInternalSchema() : internalSchema;
this.path = basePath;
+ this.useScanV2 = useScanV2;
Review Comment:
We need a better way to version the LogRecordReader. Can we pass in a
version number for now? We can evolve this later.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java:
##########
@@ -36,7 +36,7 @@ public class HoodieCommandBlock extends HoodieLogBlock {
* Hoodie command block type enum.
*/
public enum HoodieCommandBlockTypeEnum {
- ROLLBACK_PREVIOUS_BLOCK
+ ROLLBACK_BLOCK
Review Comment:
Is this backwards compatible? Making sure we dont persist this by name
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec>
keySpecOpt) {
}
}
+ private void scanInternalV2(Option<KeySpec> keySpecOption, boolean
skipProcessingBlocks) {
+ currentInstantLogBlocks = new ArrayDeque<>();
+ progress = 0.0f;
+ totalLogFiles = new AtomicLong(0);
+ totalRollbacks = new AtomicLong(0);
+ totalCorruptBlocks = new AtomicLong(0);
+ totalLogBlocks = new AtomicLong(0);
+ totalLogRecords = new AtomicLong(0);
+ HoodieLogFormatReader logFormatReaderWrapper = null;
+ HoodieTimeline commitsTimeline =
this.hoodieTableMetaClient.getCommitsTimeline();
+ HoodieTimeline completedInstantsTimeline =
commitsTimeline.filterCompletedInstants();
+ HoodieTimeline inflightInstantsTimeline =
commitsTimeline.filterInflights();
+ try {
+
+ // Get the key field based on populate meta fields config
+ // and the table type
+ final String keyField = getKeyField();
+
+ boolean enableRecordLookups = !forceFullScan;
+ // Iterate over the paths
+ logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+ logFilePaths.stream().map(logFile -> new HoodieLogFile(new
Path(logFile))).collect(Collectors.toList()),
+ readerSchema, readBlocksLazily, reverseReader, bufferSize,
enableRecordLookups, keyField, internalSchema);
+
+ /**
+ * Scanning log blocks and placing the compacted blocks at the right
place require two traversals.
+ * First traversal to identify the rollback blocks and valid data and
compacted blocks.
+ *
+ * Scanning blocks is easy to do in single writer mode, where the
rollback block is right after the effected data blocks.
+ * With multiwriter mode the blocks can be out of sync. An example
scenario.
+ * B1, B2, B3, B4, R1(B3), B5
+ * In this case, rollback block R1 is invalidating the B3 which is not
the previous block.
+ * This becomes more complicated if we have compacted blocks, which are
data blocks created using log compaction.
+ *
+ * To solve this, run a single traversal, collect all the valid blocks
that are not corrupted
+ * along with the block instant times and rollback block's target
instant times.
+ *
+ * As part of second traversal iterate block instant times in reverse
order.
+ * While iterating in reverse order keep a track of final compacted
instant times for each block.
+ * In doing so, when a data block is seen include the final compacted
block if it is not already added.
+ *
+ * find the final compacted block which contains the merged contents.
+ * For example B1 and B2 are merged and created a compacted block called
M1 and now M1, B3 and B4 are merged and
+ * created another compacted block called M2. So, now M2 is the final
block which contains all the changes of B1,B2,B3,B4.
+ * So, blockTimeToCompactionBlockTimeMap will look like
+ * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+ * This map is updated while iterating and is used to place the
compacted blocks in the correct position.
+ * This way we can have multiple layers of merge blocks and still be
able to find the correct positions of merged blocks.
+ */
+
+ // Collect targetRollbackInstants, using which we can determine which
blocks are invalid.
+ Set<String> targetRollbackInstants = new HashSet<>();
+
+ // This holds block instant time to list of blocks. Note here the log
blocks can be normal data blocks or compacted log blocks.
+ Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+ // Order of Instants.
+ List<String> orderedInstantsList = new ArrayList<>();
+
+ Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+ /*
+ * 1. First step to traverse in forward direction. While traversing the
log blocks collect following,
+ * a. instant times
+ * b. instant to logblocks map.
+ * c. targetRollbackInstants.
+ */
+ while (logFormatReaderWrapper.hasNext()) {
Review Comment:
Should we introduce a DeltaTimeline within a file slice to abstract
constructing the list of valid log blocks to read/merge with the base file?
This logic is essentially building the timeline dealing with out of order
corrupt and rollbacks.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -314,6 +334,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("When enable, hoodie will auto merge several small
archive files into larger one. It's"
+ " useful when storage scheme doesn't support append operation.");
+ public static final ConfigProperty<String> LOG_COMPACTION_BLOCKS_THRESHOLD =
ConfigProperty
+ .key("hoodie.log.compaction.blocks.threshold")
+ .defaultValue("5")
Review Comment:
Can you file a jira to track this? I think this is critical to avoid
rewriting log files over and over again. For a Streaming heavy workload this
can be quite costly I suppose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]