usberkeley commented on code in PR #11793:
URL: https://github.com/apache/hudi/pull/11793#discussion_r1737863378


##########
rfc/rfc-81/rfc-81.md:
##########
@@ -0,0 +1,122 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-81: Log Compaction with Merge Sort
+
+## Proposers
+- @usberkeley
+
+## Approvers
+- @danny0405
+
+## Status
+JIRA: https://issues.apache.org/jira/browse/HUDI-8033
+
+## Abstract
+Add lightweight LogCompaction to improve the writing performance of the write 
side, and improve the query performance of the read side (Spark/Presto, etc.) 
in some scenarios without having to wait for heavy and time-consuming 
operations such as Compaction or Clustering.
+
+## Background
+The previous LogCompaction mainly merged log files through 
HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to 
achieve record merging, which resulted in performance loss of writing to disk, 
when the amount of log data exceeds merge memory available.
+LogCompaction with Merge Sort is introduced to achieve lightweight minor 
compaction by merging records through N-way streaming of ordered data, thus 
improving the writing performance of the write side. At the same time, thanks 
to the ordered data, the query performance on the read side can be improved 
when the primary key is met.
+
+## Implementation
+### Common Configuration
+- Add a new configuration item for the HoodieLogBlock streaming read buffer 
size. The default value is 10MB.
+- Add a new configuration item for whether to enable MergeSort in 
LogCompaction. The default value for Flink is true, while the default value for 
Spark is false.
+### Flink
+- Add a new configuration item to enable LogCompaction. The default value is 
false.
+### Important Note
+1. LogCompaction with Merge Sort only supports AVRO log format. After 
LogCompaction turns on MergeSort, we need to check whether the 
hoodie.logfile.data.block.format configuration item is correct.
+2. After enabling MergeSort in LogCompaction, it does not guarantee that 
LogCompaction will necessarily use Merge Sort based merger. If it is found 
during execution that the IS_ORDERED in a LogBlock header is false (indicating 
that the LogBlock data is unordered), it will fall back to the default 
map-based merger.
+
+### LogBlock Header
+- HeaderMetadataType: Add a new enumeration type: IS_ORDERED, indicating 
whether the data in the LogBlock is ordered. The default value is false.
+
+### DeltaCommit
+When LogCompaction turns on MergeSort, DeltaCommit sorts the written data by 
RecordKey to achieve orderly records in LogBlock, and sets the LogBlock 
header's IS_ORDERED to true.
+
+### LogCompaction
+The following contents are all about enabling MergeSort with LogCompaction.
+#### Flink
+Flink has not yet fully implemented the LogCompaction feature, so the operator 
needs to be implemented:
+1. LogCompactionPlanOperator
+2. LogCompactionOperator
+3. LogCompactionCommitSink
+#### Spark & Flink
+Considering that when enabling LogCompaction with Merge Sort on the historical 
table or when multiple writers are involved and one of the writer does not 
enable Merge Sort, it may result in some LogBlock data being unordered and not 
meeting the conditions for executing MergeSort.
+Therefore, during the execution of the LogCompaction Operation, the 
HoodieUnMergedLogRecordScanner with skipProcessingBlocks will be used to check 
the IS_ORDERED header of all LogBlock files in that Operation. If the data 
within any LogBlocks is not ordered, it will fall back to the map-based merger, 
HoodieMergedLogRecordScanner.
+Otherwise, use the new log scanner: HoodieMergeSortLogRecordScanner to achieve 
N-way streaming record merging.
+
+Additionally, we do not check the IS_ORDERED flag in the LogBlock header 
during the LogCompaction scheduling plan phase to determine the scanner type. 
The main concern is that new LogBlocks might be added to the log file before 
the operation execution, and these new LogBlocks would not have had their 
headers checked for the IS_ORDERED flag, posing a risk.
+By checking the IS_ORDERED flag in the header during the operation execution 
phase, we only incur a minimal performance cost from reading a small amount of 
header information. This approach completely avoids the complex design required 
to maintain consistent scanning ranges between the scheduling plan and the 
operation execution phases.

Review Comment:
   > Are you talking about the lazy loading, we did load all the log block 
headers first before decoding the contents, and only if all the log blocks are 
sorted, we can use the sort merge read.
   
   Got it. The HoodieUnMergedLogRecordScanner with skipProcessingBlocks is 
well-suited for checking the LogBlock header. It is currently used for 
LogCompaction scheduling plans, and it will skip processing data 
(skipProcessingBlocks=true).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to