usberkeley commented on code in PR #11793: URL: https://github.com/apache/hudi/pull/11793#discussion_r1737863378
########## rfc/rfc-81/rfc-81.md: ########## @@ -0,0 +1,122 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-81: Log Compaction with Merge Sort + +## Proposers +- @usberkeley + +## Approvers +- @danny0405 + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-8033 + +## Abstract +Add lightweight LogCompaction to improve the writing performance of the write side, and improve the query performance of the read side (Spark/Presto, etc.) in some scenarios without having to wait for heavy and time-consuming operations such as Compaction or Clustering. + +## Background +The previous LogCompaction mainly merged log files through HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to achieve record merging, which resulted in performance loss of writing to disk, when the amount of log data exceeds merge memory available. +LogCompaction with Merge Sort is introduced to achieve lightweight minor compaction by merging records through N-way streaming of ordered data, thus improving the writing performance of the write side. At the same time, thanks to the ordered data, the query performance on the read side can be improved when the primary key is met. + +## Implementation +### Common Configuration +- Add a new configuration item for the HoodieLogBlock streaming read buffer size. The default value is 10MB. +- Add a new configuration item for whether to enable MergeSort in LogCompaction. The default value for Flink is true, while the default value for Spark is false. +### Flink +- Add a new configuration item to enable LogCompaction. The default value is false. +### Important Note +1. LogCompaction with Merge Sort only supports AVRO log format. After LogCompaction turns on MergeSort, we need to check whether the hoodie.logfile.data.block.format configuration item is correct. +2. After enabling MergeSort in LogCompaction, it does not guarantee that LogCompaction will necessarily use Merge Sort based merger. If it is found during execution that the IS_ORDERED in a LogBlock header is false (indicating that the LogBlock data is unordered), it will fall back to the default map-based merger. + +### LogBlock Header +- HeaderMetadataType: Add a new enumeration type: IS_ORDERED, indicating whether the data in the LogBlock is ordered. The default value is false. + +### DeltaCommit +When LogCompaction turns on MergeSort, DeltaCommit sorts the written data by RecordKey to achieve orderly records in LogBlock, and sets the LogBlock header's IS_ORDERED to true. + +### LogCompaction +The following contents are all about enabling MergeSort with LogCompaction. +#### Flink +Flink has not yet fully implemented the LogCompaction feature, so the operator needs to be implemented: +1. LogCompactionPlanOperator +2. LogCompactionOperator +3. LogCompactionCommitSink +#### Spark & Flink +Considering that when enabling LogCompaction with Merge Sort on the historical table or when multiple writers are involved and one of the writer does not enable Merge Sort, it may result in some LogBlock data being unordered and not meeting the conditions for executing MergeSort. +Therefore, during the execution of the LogCompaction Operation, the HoodieUnMergedLogRecordScanner with skipProcessingBlocks will be used to check the IS_ORDERED header of all LogBlock files in that Operation. If the data within any LogBlocks is not ordered, it will fall back to the map-based merger, HoodieMergedLogRecordScanner. +Otherwise, use the new log scanner: HoodieMergeSortLogRecordScanner to achieve N-way streaming record merging. + +Additionally, we do not check the IS_ORDERED flag in the LogBlock header during the LogCompaction scheduling plan phase to determine the scanner type. The main concern is that new LogBlocks might be added to the log file before the operation execution, and these new LogBlocks would not have had their headers checked for the IS_ORDERED flag, posing a risk. +By checking the IS_ORDERED flag in the header during the operation execution phase, we only incur a minimal performance cost from reading a small amount of header information. This approach completely avoids the complex design required to maintain consistent scanning ranges between the scheduling plan and the operation execution phases. Review Comment: > Are you talking about the lazy loading, we did load all the log block headers first before decoding the contents, and only if all the log blocks are sorted, we can use the sort merge read. Got it. The HoodieUnMergedLogRecordScanner with skipProcessingBlocks is well-suited for checking the LogBlock header. It is currently used for LogCompaction scheduling plans, and it will skip processing data (skipProcessingBlocks=true). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
