usberkeley commented on code in PR #11793: URL: https://github.com/apache/hudi/pull/11793#discussion_r1721421620
########## rfc/rfc-81/rfc-81.md: ########## @@ -0,0 +1,108 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-81: Log Compaction with Merge Sort + +## Proposers +- @usberkeley + +## Approvers +- @danny0405 + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-8033 + +## Abstract +Add lightweight LogCompaction to improve the writing performance of the write side, and improve the query performance of the read side (Spark/Presto, etc.) in some scenarios without having to wait for heavy and time-consuming operations such as Compaction or Clustering. + +## Background +The previous LogCompaction mainly merged log files through HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to achieve record merging, which resulted in performance loss of writing to disk. +LogCompaction with Merge Sort is introduced to achieve lightweight minor compaction by merging records through N-way streaming of ordered data, thus improving the writing performance of the write side. At the same time, thanks to the ordered data, the query performance on the read side can be improved when the primary key is met. Review Comment: When the SQL predicate on the query side matches the primary key or the leftmost column of the primary key, unnecessary data blocks can be skipped. Of course, the query side (Spark/Presto) needs to support this predicate pushdown. Currently, our internal Presto implements this predicate pushdown. ########## rfc/rfc-81/rfc-81.md: ########## @@ -0,0 +1,108 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-81: Log Compaction with Merge Sort + +## Proposers +- @usberkeley + +## Approvers +- @danny0405 + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-8033 + +## Abstract +Add lightweight LogCompaction to improve the writing performance of the write side, and improve the query performance of the read side (Spark/Presto, etc.) in some scenarios without having to wait for heavy and time-consuming operations such as Compaction or Clustering. + +## Background +The previous LogCompaction mainly merged log files through HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to achieve record merging, which resulted in performance loss of writing to disk. +LogCompaction with Merge Sort is introduced to achieve lightweight minor compaction by merging records through N-way streaming of ordered data, thus improving the writing performance of the write side. At the same time, thanks to the ordered data, the query performance on the read side can be improved when the primary key is met. + +## Implementation +### HoodieConfig etc +Added a new configuration item for the HoodieLogBlock streaming read buffer size. The default value is 10MB. Review Comment: > Maybe it's based on specific impl, the data block payloads are serialized as a list, does the streaming style read conflicts with the payload integrity? Don't worry about the integrity of the record, because each AVRO record in the DataBlock has a content length prefix, so the decoder can know whether the record is complete. If the buffer is not enough to decode a record, then continue to read the next small part into the buffer. This is a bit like streaming reading, I have detailed description in ### HoodieDataBlock ########## rfc/rfc-81/rfc-81.md: ########## @@ -0,0 +1,108 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-81: Log Compaction with Merge Sort + +## Proposers +- @usberkeley + +## Approvers +- @danny0405 + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-8033 + +## Abstract +Add lightweight LogCompaction to improve the writing performance of the write side, and improve the query performance of the read side (Spark/Presto, etc.) in some scenarios without having to wait for heavy and time-consuming operations such as Compaction or Clustering. + +## Background +The previous LogCompaction mainly merged log files through HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to achieve record merging, which resulted in performance loss of writing to disk. +LogCompaction with Merge Sort is introduced to achieve lightweight minor compaction by merging records through N-way streaming of ordered data, thus improving the writing performance of the write side. At the same time, thanks to the ordered data, the query performance on the read side can be improved when the primary key is met. + +## Implementation +### HoodieConfig etc +Added a new configuration item for the HoodieLogBlock streaming read buffer size. The default value is 10MB. +#### Flink +Added a new configuration item to enable LogCompaction. The default value is false. +Note: +1. After Flink enables LogCompaction, the default implementation is MergeSort. +2. Currently, the log format only supports AVRO. After enabling LogCompaction, need to check whether the hoodie.logfile.data.block.format configuration item is correct. + +#### Spark +Added a new configuration item for whether to enable MergeSort in LogCompaction. The default value is false. +Note: +1. Currently, the log format only supports AVRO. After LogCompaction turns on MergeSort, you need to check whether the hoodie.logfile.data.block.format configuration item is correct. + +### DeltaCommit +#### Flink +When LogCompaction is enabled, DeltaCommit sorts the written data by RecordKey to achieve orderly records in LogBlock. +#### Spark +When LogCompaction turns on MergeSort, DeltaCommit sorts the written data by RecordKey to achieve orderly records in LogBlock. Review Comment: > So, this is going to incur some additional overhead wrt latency? this is the only downside right? I also checked the code for AppendHandle. I initially thought we are streaming all the way through. but we are collecting records in a list and then once size threshold is met, we write to file. So, already we are not fully streaming at the write handle layer. There may not be any additional write latency overhead, because Merge Sort also reduces the overhead of ExternalSpillableMap writing to disk. Our preliminary tests show that data freshness is greatly improved. ########## rfc/rfc-81/rfc-81.md: ########## @@ -0,0 +1,108 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-81: Log Compaction with Merge Sort + +## Proposers +- @usberkeley + +## Approvers +- @danny0405 + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-8033 + +## Abstract +Add lightweight LogCompaction to improve the writing performance of the write side, and improve the query performance of the read side (Spark/Presto, etc.) in some scenarios without having to wait for heavy and time-consuming operations such as Compaction or Clustering. + +## Background +The previous LogCompaction mainly merged log files through HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to achieve record merging, which resulted in performance loss of writing to disk. +LogCompaction with Merge Sort is introduced to achieve lightweight minor compaction by merging records through N-way streaming of ordered data, thus improving the writing performance of the write side. At the same time, thanks to the ordered data, the query performance on the read side can be improved when the primary key is met. + +## Implementation +### HoodieConfig etc +Added a new configuration item for the HoodieLogBlock streaming read buffer size. The default value is 10MB. +#### Flink +Added a new configuration item to enable LogCompaction. The default value is false. +Note: +1. After Flink enables LogCompaction, the default implementation is MergeSort. +2. Currently, the log format only supports AVRO. After enabling LogCompaction, need to check whether the hoodie.logfile.data.block.format configuration item is correct. + +#### Spark +Added a new configuration item for whether to enable MergeSort in LogCompaction. The default value is false. +Note: +1. Currently, the log format only supports AVRO. After LogCompaction turns on MergeSort, you need to check whether the hoodie.logfile.data.block.format configuration item is correct. + +### DeltaCommit +#### Flink +When LogCompaction is enabled, DeltaCommit sorts the written data by RecordKey to achieve orderly records in LogBlock. Review Comment: > I list some data types that are working as strings: string, numeric, timestamp, should be sufficient for most of the use cases. When merging records in Hudi, you can use strings or primitive types, because RecordKey is already the primary key, so you can merge duplicate records, whether using Merge Sort or Map. ########## rfc/rfc-81/rfc-81.md: ########## @@ -0,0 +1,108 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-81: Log Compaction with Merge Sort + +## Proposers +- @usberkeley + +## Approvers +- @danny0405 + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-8033 + +## Abstract +Add lightweight LogCompaction to improve the writing performance of the write side, and improve the query performance of the read side (Spark/Presto, etc.) in some scenarios without having to wait for heavy and time-consuming operations such as Compaction or Clustering. + +## Background +The previous LogCompaction mainly merged log files through HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to achieve record merging, which resulted in performance loss of writing to disk. +LogCompaction with Merge Sort is introduced to achieve lightweight minor compaction by merging records through N-way streaming of ordered data, thus improving the writing performance of the write side. At the same time, thanks to the ordered data, the query performance on the read side can be improved when the primary key is met. + +## Implementation +### HoodieConfig etc +Added a new configuration item for the HoodieLogBlock streaming read buffer size. The default value is 10MB. +#### Flink +Added a new configuration item to enable LogCompaction. The default value is false. +Note: +1. After Flink enables LogCompaction, the default implementation is MergeSort. +2. Currently, the log format only supports AVRO. After enabling LogCompaction, need to check whether the hoodie.logfile.data.block.format configuration item is correct. + +#### Spark +Added a new configuration item for whether to enable MergeSort in LogCompaction. The default value is false. +Note: +1. Currently, the log format only supports AVRO. After LogCompaction turns on MergeSort, you need to check whether the hoodie.logfile.data.block.format configuration item is correct. + +### DeltaCommit +#### Flink +When LogCompaction is enabled, DeltaCommit sorts the written data by RecordKey to achieve orderly records in LogBlock. +#### Spark +When LogCompaction turns on MergeSort, DeltaCommit sorts the written data by RecordKey to achieve orderly records in LogBlock. + +### LogCompaction +The following contents are all about enabling LogCompaction or enabling MergeSort with LogCompaction. +#### Flink +Flink has not yet fully implemented the LogCompaction feature, so the operator needs to be implemented: +1. LogCompactionPlanOperator +2. LogCompactionOperator +3. LogCompactionCommitSink + +When scanning log files, use the new log scanner: HoodieMergeSortLogRecordScanner to achieve N-way streaming record merging. + +#### Spark +When scanning log files, use the new log scanner: HoodieMergeSortLogRecordScanner to achieve N-way streaming record merging. + +### HoodieMergeSortLogRecordScanner +Implement a min-heap. The heap node is a HoodieLogBlock object. The heap node comparison uses HoodieRecord#RecrodKey. Review Comment: > The string comparison may be working for most of the use cases, can you figure out a use case where the string representation comparison contradicts with the original data type? These data types should be fine: string, numeric , timestamp. When merging records in Hudi, we can use strings or primitive types, because RecordKey is already the primary key, so we can merge duplicate records, whether using Merge Sort or Map. ########## rfc/rfc-81/rfc-81.md: ########## @@ -0,0 +1,108 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-81: Log Compaction with Merge Sort + +## Proposers +- @usberkeley + +## Approvers +- @danny0405 + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-8033 + +## Abstract +Add lightweight LogCompaction to improve the writing performance of the write side, and improve the query performance of the read side (Spark/Presto, etc.) in some scenarios without having to wait for heavy and time-consuming operations such as Compaction or Clustering. + +## Background +The previous LogCompaction mainly merged log files through HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to achieve record merging, which resulted in performance loss of writing to disk. +LogCompaction with Merge Sort is introduced to achieve lightweight minor compaction by merging records through N-way streaming of ordered data, thus improving the writing performance of the write side. At the same time, thanks to the ordered data, the query performance on the read side can be improved when the primary key is met. + +## Implementation +### HoodieConfig etc +Added a new configuration item for the HoodieLogBlock streaming read buffer size. The default value is 10MB. +#### Flink +Added a new configuration item to enable LogCompaction. The default value is false. +Note: +1. After Flink enables LogCompaction, the default implementation is MergeSort. +2. Currently, the log format only supports AVRO. After enabling LogCompaction, need to check whether the hoodie.logfile.data.block.format configuration item is correct. + +#### Spark +Added a new configuration item for whether to enable MergeSort in LogCompaction. The default value is false. +Note: +1. Currently, the log format only supports AVRO. After LogCompaction turns on MergeSort, you need to check whether the hoodie.logfile.data.block.format configuration item is correct. + +### DeltaCommit +#### Flink +When LogCompaction is enabled, DeltaCommit sorts the written data by RecordKey to achieve orderly records in LogBlock. +#### Spark +When LogCompaction turns on MergeSort, DeltaCommit sorts the written data by RecordKey to achieve orderly records in LogBlock. + +### LogCompaction +The following contents are all about enabling LogCompaction or enabling MergeSort with LogCompaction. +#### Flink +Flink has not yet fully implemented the LogCompaction feature, so the operator needs to be implemented: +1. LogCompactionPlanOperator +2. LogCompactionOperator +3. LogCompactionCommitSink + +When scanning log files, use the new log scanner: HoodieMergeSortLogRecordScanner to achieve N-way streaming record merging. + +#### Spark +When scanning log files, use the new log scanner: HoodieMergeSortLogRecordScanner to achieve N-way streaming record merging. + +### HoodieMergeSortLogRecordScanner +Implement a min-heap. The heap node is a HoodieLogBlock object. The heap node comparison uses HoodieRecord#RecrodKey. Review Comment: > this could mean that, we might have all data blocks open in some cases until we fully exhaust all records. Something to watch out for. We have had memory issues when dealing w/ 100+ log blocks. Hello, nsivabalan. Yes, we need to open all data blocks. I have explained how to avoid OOM in ### HoodieDataBlock. Specifically, we implement streaming read, read a part into cache each time, for example, 10MB, and when AVRO decodes records to the end of the cache or the cache is not enough to decode a record, we continue to read part into cache until the end of the file. In extreme cases, if there are a lot of data blocks, such as 1000+, users can adjust the cache size to avoid OOM ########## rfc/rfc-81/rfc-81.md: ########## @@ -0,0 +1,108 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-81: Log Compaction with Merge Sort + +## Proposers +- @usberkeley + +## Approvers +- @danny0405 + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-8033 + +## Abstract +Add lightweight LogCompaction to improve the writing performance of the write side, and improve the query performance of the read side (Spark/Presto, etc.) in some scenarios without having to wait for heavy and time-consuming operations such as Compaction or Clustering. + +## Background +The previous LogCompaction mainly merged log files through HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to achieve record merging, which resulted in performance loss of writing to disk. +LogCompaction with Merge Sort is introduced to achieve lightweight minor compaction by merging records through N-way streaming of ordered data, thus improving the writing performance of the write side. At the same time, thanks to the ordered data, the query performance on the read side can be improved when the primary key is met. Review Comment: > > The write performance should have some regression because of the sort procedure, for query performance boost, you mean we sort the inputs by primary key? Can the sort key be customized? > > Customized sorting seems hard to be implemented during this merge-sort process, because in this process, we need to do record-combine based on the arrangement of primary keys together. cc @usberkeley Will the problems I mentioned occur in your implementation? As you mentioned, custom sorting is not supported ########## rfc/rfc-81/rfc-81.md: ########## @@ -0,0 +1,108 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-81: Log Compaction with Merge Sort + +## Proposers +- @usberkeley + +## Approvers +- @danny0405 + +## Status +JIRA: https://issues.apache.org/jira/browse/HUDI-8033 + +## Abstract +Add lightweight LogCompaction to improve the writing performance of the write side, and improve the query performance of the read side (Spark/Presto, etc.) in some scenarios without having to wait for heavy and time-consuming operations such as Compaction or Clustering. + +## Background +The previous LogCompaction mainly merged log files through HoodieMergedLogRecordScanner, and used ExternalSpillableMap internally to achieve record merging, which resulted in performance loss of writing to disk. +LogCompaction with Merge Sort is introduced to achieve lightweight minor compaction by merging records through N-way streaming of ordered data, thus improving the writing performance of the write side. At the same time, thanks to the ordered data, the query performance on the read side can be improved when the primary key is met. Review Comment: > The write performance should have some regression because of the sort procedure, for query performance boost, you mean we sort the inputs by primary key? Can the sort key be customized? 1)The sort key cannot be customized 2)The write performance will be slightly degraded in DeltaCommit, but Merge Sort avoids the overhead of ExternalSpillableMap writing to disk. Our internal preliminary tests show that data freshness and write throughput are greatly improved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
