usberkeley commented on code in PR #11793:
URL: https://github.com/apache/hudi/pull/11793#discussion_r1721421620


##########
rfc/rfc-81/rfc-81.md:
##########
@@ -0,0 +1,108 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-81: Log Compaction with Merge Sort
+
+## Proposers
+- @usberkeley
+
+## Approvers
+- @danny0405
+
+## Status
+JIRA: https://issues.apache.org/jira/browse/HUDI-8033
+
+## Abstract
+Add lightweight LogCompaction to improve the writing performance of the write 
side, and improve the query performance of the read side (Spark/Presto, etc.) 
in some scenarios without having to wait for heavy and time-consuming 
operations such as Compaction or Clustering.
+
+## Background
+The previous LogCompaction mainly merged log files through 
HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to 
achieve record merging, which resulted in performance loss of writing to disk.
+LogCompaction with Merge Sort is introduced to achieve lightweight minor 
compaction by merging records through N-way streaming of ordered data, thus 
improving the writing performance of the write side. At the same time, thanks 
to the ordered data, the query performance on the read side can be improved 
when the primary key is met.

Review Comment:
   When the SQL predicate on the query side matches the primary key or the 
leftmost column of the primary key, unnecessary data blocks can be skipped. Of 
course, the query side (Spark/Presto) needs to support this predicate pushdown. 
Currently, our internal Presto implements this predicate pushdown.



##########
rfc/rfc-81/rfc-81.md:
##########
@@ -0,0 +1,108 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-81: Log Compaction with Merge Sort
+
+## Proposers
+- @usberkeley
+
+## Approvers
+- @danny0405
+
+## Status
+JIRA: https://issues.apache.org/jira/browse/HUDI-8033
+
+## Abstract
+Add lightweight LogCompaction to improve the writing performance of the write 
side, and improve the query performance of the read side (Spark/Presto, etc.) 
in some scenarios without having to wait for heavy and time-consuming 
operations such as Compaction or Clustering.
+
+## Background
+The previous LogCompaction mainly merged log files through 
HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to 
achieve record merging, which resulted in performance loss of writing to disk.
+LogCompaction with Merge Sort is introduced to achieve lightweight minor 
compaction by merging records through N-way streaming of ordered data, thus 
improving the writing performance of the write side. At the same time, thanks 
to the ordered data, the query performance on the read side can be improved 
when the primary key is met.
+
+## Implementation
+### HoodieConfig etc
+Added a new configuration item for the HoodieLogBlock streaming read buffer 
size. The default value is 10MB.

Review Comment:
   > Maybe it's based on specific impl, the data block payloads are serialized 
as a list, does the streaming style read conflicts with the payload integrity?
   
   Don't worry about the integrity of the record, because each AVRO record in 
the DataBlock has a content length prefix, so the decoder can know whether the 
record is complete. If the buffer is not enough to decode a record, then 
continue to read the next small part into the buffer. This is a bit like 
streaming reading, I have detailed description in ### HoodieDataBlock



##########
rfc/rfc-81/rfc-81.md:
##########
@@ -0,0 +1,108 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-81: Log Compaction with Merge Sort
+
+## Proposers
+- @usberkeley
+
+## Approvers
+- @danny0405
+
+## Status
+JIRA: https://issues.apache.org/jira/browse/HUDI-8033
+
+## Abstract
+Add lightweight LogCompaction to improve the writing performance of the write 
side, and improve the query performance of the read side (Spark/Presto, etc.) 
in some scenarios without having to wait for heavy and time-consuming 
operations such as Compaction or Clustering.
+
+## Background
+The previous LogCompaction mainly merged log files through 
HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to 
achieve record merging, which resulted in performance loss of writing to disk.
+LogCompaction with Merge Sort is introduced to achieve lightweight minor 
compaction by merging records through N-way streaming of ordered data, thus 
improving the writing performance of the write side. At the same time, thanks 
to the ordered data, the query performance on the read side can be improved 
when the primary key is met.
+
+## Implementation
+### HoodieConfig etc
+Added a new configuration item for the HoodieLogBlock streaming read buffer 
size. The default value is 10MB.
+#### Flink
+Added a new configuration item to enable LogCompaction. The default value is 
false.
+Note:
+1. After Flink enables LogCompaction, the default implementation is MergeSort.
+2. Currently, the log format only supports AVRO. After enabling LogCompaction, 
need to check whether the hoodie.logfile.data.block.format configuration item 
is correct.
+
+#### Spark
+Added a new configuration item for whether to enable MergeSort in 
LogCompaction. The default value is false.
+Note:
+1. Currently, the log format only supports AVRO. After LogCompaction turns on 
MergeSort, you need to check whether the hoodie.logfile.data.block.format 
configuration item is correct.
+
+### DeltaCommit
+#### Flink
+When LogCompaction is enabled, DeltaCommit sorts the written data by RecordKey 
to achieve orderly records in LogBlock.
+#### Spark
+When LogCompaction turns on MergeSort, DeltaCommit sorts the written data by 
RecordKey to achieve orderly records in LogBlock.

Review Comment:
   > So, this is going to incur some additional overhead wrt latency? this is 
the only downside right? I also checked the code for AppendHandle. I initially 
thought we are streaming all the way through. but we are collecting records in 
a list and then once size threshold is met, we write to file. So, already we 
are not fully streaming at the write handle layer.
   
   There may not be any additional write latency overhead, because Merge Sort 
also reduces the overhead of ExternalSpillableMap writing to disk. Our 
preliminary tests show that data freshness is greatly improved.



##########
rfc/rfc-81/rfc-81.md:
##########
@@ -0,0 +1,108 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-81: Log Compaction with Merge Sort
+
+## Proposers
+- @usberkeley
+
+## Approvers
+- @danny0405
+
+## Status
+JIRA: https://issues.apache.org/jira/browse/HUDI-8033
+
+## Abstract
+Add lightweight LogCompaction to improve the writing performance of the write 
side, and improve the query performance of the read side (Spark/Presto, etc.) 
in some scenarios without having to wait for heavy and time-consuming 
operations such as Compaction or Clustering.
+
+## Background
+The previous LogCompaction mainly merged log files through 
HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to 
achieve record merging, which resulted in performance loss of writing to disk.
+LogCompaction with Merge Sort is introduced to achieve lightweight minor 
compaction by merging records through N-way streaming of ordered data, thus 
improving the writing performance of the write side. At the same time, thanks 
to the ordered data, the query performance on the read side can be improved 
when the primary key is met.
+
+## Implementation
+### HoodieConfig etc
+Added a new configuration item for the HoodieLogBlock streaming read buffer 
size. The default value is 10MB.
+#### Flink
+Added a new configuration item to enable LogCompaction. The default value is 
false.
+Note:
+1. After Flink enables LogCompaction, the default implementation is MergeSort.
+2. Currently, the log format only supports AVRO. After enabling LogCompaction, 
need to check whether the hoodie.logfile.data.block.format configuration item 
is correct.
+
+#### Spark
+Added a new configuration item for whether to enable MergeSort in 
LogCompaction. The default value is false.
+Note:
+1. Currently, the log format only supports AVRO. After LogCompaction turns on 
MergeSort, you need to check whether the hoodie.logfile.data.block.format 
configuration item is correct.
+
+### DeltaCommit
+#### Flink
+When LogCompaction is enabled, DeltaCommit sorts the written data by RecordKey 
to achieve orderly records in LogBlock.

Review Comment:
   > I list some data types that are working as strings: string, numeric, 
timestamp, should be sufficient for most of the use cases.
   
   When merging records in Hudi, you can use strings or primitive types, 
because RecordKey is already the primary key, so you can merge duplicate 
records, whether using Merge Sort or Map.



##########
rfc/rfc-81/rfc-81.md:
##########
@@ -0,0 +1,108 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-81: Log Compaction with Merge Sort
+
+## Proposers
+- @usberkeley
+
+## Approvers
+- @danny0405
+
+## Status
+JIRA: https://issues.apache.org/jira/browse/HUDI-8033
+
+## Abstract
+Add lightweight LogCompaction to improve the writing performance of the write 
side, and improve the query performance of the read side (Spark/Presto, etc.) 
in some scenarios without having to wait for heavy and time-consuming 
operations such as Compaction or Clustering.
+
+## Background
+The previous LogCompaction mainly merged log files through 
HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to 
achieve record merging, which resulted in performance loss of writing to disk.
+LogCompaction with Merge Sort is introduced to achieve lightweight minor 
compaction by merging records through N-way streaming of ordered data, thus 
improving the writing performance of the write side. At the same time, thanks 
to the ordered data, the query performance on the read side can be improved 
when the primary key is met.
+
+## Implementation
+### HoodieConfig etc
+Added a new configuration item for the HoodieLogBlock streaming read buffer 
size. The default value is 10MB.
+#### Flink
+Added a new configuration item to enable LogCompaction. The default value is 
false.
+Note:
+1. After Flink enables LogCompaction, the default implementation is MergeSort.
+2. Currently, the log format only supports AVRO. After enabling LogCompaction, 
need to check whether the hoodie.logfile.data.block.format configuration item 
is correct.
+
+#### Spark
+Added a new configuration item for whether to enable MergeSort in 
LogCompaction. The default value is false.
+Note:
+1. Currently, the log format only supports AVRO. After LogCompaction turns on 
MergeSort, you need to check whether the hoodie.logfile.data.block.format 
configuration item is correct.
+
+### DeltaCommit
+#### Flink
+When LogCompaction is enabled, DeltaCommit sorts the written data by RecordKey 
to achieve orderly records in LogBlock.
+#### Spark
+When LogCompaction turns on MergeSort, DeltaCommit sorts the written data by 
RecordKey to achieve orderly records in LogBlock.
+
+### LogCompaction
+The following contents are all about enabling LogCompaction or enabling 
MergeSort with LogCompaction.
+#### Flink
+Flink has not yet fully implemented the LogCompaction feature, so the operator 
needs to be implemented:
+1. LogCompactionPlanOperator
+2. LogCompactionOperator
+3. LogCompactionCommitSink
+
+When scanning log files, use the new log scanner: 
HoodieMergeSortLogRecordScanner to achieve N-way streaming record merging.
+
+#### Spark
+When scanning log files, use the new log scanner: 
HoodieMergeSortLogRecordScanner to achieve N-way streaming record merging.
+
+### HoodieMergeSortLogRecordScanner
+Implement a min-heap. The heap node is a HoodieLogBlock object. The heap node 
comparison uses HoodieRecord#RecrodKey.

Review Comment:
   > The string comparison may be working for most of the use cases, can you 
figure out a use case where the string representation comparison contradicts 
with the original data type? These data types should be fine: string, numeric , 
timestamp.
   
   When merging records in Hudi, we can use strings or primitive types, because 
RecordKey is already the primary key, so we can merge duplicate records, 
whether using Merge Sort or Map.



##########
rfc/rfc-81/rfc-81.md:
##########
@@ -0,0 +1,108 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-81: Log Compaction with Merge Sort
+
+## Proposers
+- @usberkeley
+
+## Approvers
+- @danny0405
+
+## Status
+JIRA: https://issues.apache.org/jira/browse/HUDI-8033
+
+## Abstract
+Add lightweight LogCompaction to improve the writing performance of the write 
side, and improve the query performance of the read side (Spark/Presto, etc.) 
in some scenarios without having to wait for heavy and time-consuming 
operations such as Compaction or Clustering.
+
+## Background
+The previous LogCompaction mainly merged log files through 
HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to 
achieve record merging, which resulted in performance loss of writing to disk.
+LogCompaction with Merge Sort is introduced to achieve lightweight minor 
compaction by merging records through N-way streaming of ordered data, thus 
improving the writing performance of the write side. At the same time, thanks 
to the ordered data, the query performance on the read side can be improved 
when the primary key is met.
+
+## Implementation
+### HoodieConfig etc
+Added a new configuration item for the HoodieLogBlock streaming read buffer 
size. The default value is 10MB.
+#### Flink
+Added a new configuration item to enable LogCompaction. The default value is 
false.
+Note:
+1. After Flink enables LogCompaction, the default implementation is MergeSort.
+2. Currently, the log format only supports AVRO. After enabling LogCompaction, 
need to check whether the hoodie.logfile.data.block.format configuration item 
is correct.
+
+#### Spark
+Added a new configuration item for whether to enable MergeSort in 
LogCompaction. The default value is false.
+Note:
+1. Currently, the log format only supports AVRO. After LogCompaction turns on 
MergeSort, you need to check whether the hoodie.logfile.data.block.format 
configuration item is correct.
+
+### DeltaCommit
+#### Flink
+When LogCompaction is enabled, DeltaCommit sorts the written data by RecordKey 
to achieve orderly records in LogBlock.
+#### Spark
+When LogCompaction turns on MergeSort, DeltaCommit sorts the written data by 
RecordKey to achieve orderly records in LogBlock.
+
+### LogCompaction
+The following contents are all about enabling LogCompaction or enabling 
MergeSort with LogCompaction.
+#### Flink
+Flink has not yet fully implemented the LogCompaction feature, so the operator 
needs to be implemented:
+1. LogCompactionPlanOperator
+2. LogCompactionOperator
+3. LogCompactionCommitSink
+
+When scanning log files, use the new log scanner: 
HoodieMergeSortLogRecordScanner to achieve N-way streaming record merging.
+
+#### Spark
+When scanning log files, use the new log scanner: 
HoodieMergeSortLogRecordScanner to achieve N-way streaming record merging.
+
+### HoodieMergeSortLogRecordScanner
+Implement a min-heap. The heap node is a HoodieLogBlock object. The heap node 
comparison uses HoodieRecord#RecrodKey.

Review Comment:
   > this could mean that, we might have all data blocks open in some cases 
until we fully exhaust all records. Something to watch out for. We have had 
memory issues when dealing w/ 100+ log blocks.
   
   Hello, nsivabalan. Yes, we need to open all data blocks. I have explained 
how to avoid OOM in ### HoodieDataBlock. Specifically, we implement streaming 
read, read a part into cache each time, for example, 10MB, and when AVRO 
decodes records to the end of the cache or the cache is not enough to decode a 
record, we continue to read part into cache until the end of the file.
   
   In extreme cases, if there are a lot of data blocks, such as 1000+, users 
can adjust the cache size to avoid OOM



##########
rfc/rfc-81/rfc-81.md:
##########
@@ -0,0 +1,108 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-81: Log Compaction with Merge Sort
+
+## Proposers
+- @usberkeley
+
+## Approvers
+- @danny0405
+
+## Status
+JIRA: https://issues.apache.org/jira/browse/HUDI-8033
+
+## Abstract
+Add lightweight LogCompaction to improve the writing performance of the write 
side, and improve the query performance of the read side (Spark/Presto, etc.) 
in some scenarios without having to wait for heavy and time-consuming 
operations such as Compaction or Clustering.
+
+## Background
+The previous LogCompaction mainly merged log files through 
HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to 
achieve record merging, which resulted in performance loss of writing to disk.
+LogCompaction with Merge Sort is introduced to achieve lightweight minor 
compaction by merging records through N-way streaming of ordered data, thus 
improving the writing performance of the write side. At the same time, thanks 
to the ordered data, the query performance on the read side can be improved 
when the primary key is met.

Review Comment:
   > > The write performance should have some regression because of the sort 
procedure, for query performance boost, you mean we sort the inputs by primary 
key? Can the sort key be customized?
   > 
   > Customized sorting seems hard to be implemented during this merge-sort 
process, because in this process, we need to do record-combine based on the 
arrangement of primary keys together. cc @usberkeley Will the problems I 
mentioned occur in your implementation?
   
   As you mentioned, custom sorting is not supported



##########
rfc/rfc-81/rfc-81.md:
##########
@@ -0,0 +1,108 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-81: Log Compaction with Merge Sort
+
+## Proposers
+- @usberkeley
+
+## Approvers
+- @danny0405
+
+## Status
+JIRA: https://issues.apache.org/jira/browse/HUDI-8033
+
+## Abstract
+Add lightweight LogCompaction to improve the writing performance of the write 
side, and improve the query performance of the read side (Spark/Presto, etc.) 
in some scenarios without having to wait for heavy and time-consuming 
operations such as Compaction or Clustering.
+
+## Background
+The previous LogCompaction mainly merged log files through 
HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to 
achieve record merging, which resulted in performance loss of writing to disk.
+LogCompaction with Merge Sort is introduced to achieve lightweight minor 
compaction by merging records through N-way streaming of ordered data, thus 
improving the writing performance of the write side. At the same time, thanks 
to the ordered data, the query performance on the read side can be improved 
when the primary key is met.

Review Comment:
   > The write performance should have some regression because of the sort 
procedure, for query performance boost, you mean we sort the inputs by primary 
key? Can the sort key be customized?
   
   1)The sort key cannot be customized
   2)The write performance will be slightly degraded in DeltaCommit, but Merge 
Sort avoids the overhead of ExternalSpillableMap writing to disk. Our internal 
preliminary tests show that data freshness and write throughput are greatly 
improved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to