nsivabalan commented on a change in pull request #4280: URL: https://github.com/apache/hudi/pull/4280#discussion_r768735812
########## File path: rfc/rfc-27/rfc-27.md ########## @@ -0,0 +1,409 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-[27]: [Data skipping Index to improve query performance] + +## Proposers + +- @manojpec +- @shivnarayan +- @satish.kotha + +## Approvers +- @rmpifer +- @uditme + +## Status + +JIRA: https://issues.apache.org/jira/browse/HUDI-1822 + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Query engines typically scan large amounts of irrelevant data for query planning and execution. Some workarounds are +available to reduce amount of irrelevant data scanned. These include +- Partition pruning +- File pruning <br> + - Some data file formats contain metadata including range information for certain columns (for parquet, this metadata + is stored in footer). + - As part of query planning, all range information from data files is read. + - Irrelevant data files are then pruned based on predicates and available range information + +Partition pruning typically puts the burden on users to select partitions where the data may exist. File pruning approach + is expensive and does not scale if there are large number of partitions and data files to be scanned. So we propose a + new solution to store additional information as part of Hudi metadata table to implement data skipping index. The + goals of data skipping index is to provide: + +- Global index: Users query for information they need without need for specifying partitions. Index can effectively find + data files in the table. +- Improve query plan: Efficiently find data files that have information for specified query predicates. +- Support multiple types of index: Initial implementation may provide range index. But goal is provide flexible + framework to implement other types of index (e.g. bloom) + +## Background +RFC-15 added metadata table support to Hudi for optimized file listing. RFC-37 is adding metadata index and column stats +as another partition to metadata table. This RFC will piggyback on the column stats partition that RFC-37 will be adding +to metadata table. + +Notes: Effectiveness of the index will be proportional to how data is layed out. If every file contains data for +commonly specified query predicate, index may not be very effective. + +## Implementation +At a high level there are 3 components to implement index support: +- Storage format +- Metadata generation +- Query engine integration. + +### Column_Stats Index/Partition +We want to support multiple types of index (range, bloom etc). So it is important to generate different types of record +for different columns. Focus of this RFC will be column range or column stats index. i.e min, max values, null counts etc. +Users can configure the commonly queried columns and columns stats partition in metadata table will store all stats pertaining +to the configured columns for every valid data file where the column is present. + +Similar to how we generate records for files partition in metadata table, we will generate HoodieMetadataRecord +for column stats partition on any commit that gets applied to metadata table. Basic building block of metadata table used +for file listing will be used for this column stats partition as well (how updates are applied to metadata table, +how invalid data is ignored, etc) + +Column_stats partition stores statistics for all indexed columns in the Hudi data table. The index maintained in this +partition helps +Predicate pushing/data skipping - file filtering based on column predicates + +For the purpose of column predicate filtering, this partition can store statistics for any column as per configs. + +So, high level requirement for this column_stats partition is (pertaining to this RFC): + - Given a list of columns and predicates(and optionally partitions), return a list of matching file names + +### Storage format +To cater to the above requirement, we plan to encode column name, partition path and file name to the keys in HFile. +Since HFile supports efficient range/prefix search, our look up should be very fast. + + + +We plan to generate unique and random and unique hash IDs for all 3 components +- ColumnID : + - base64(hash32(column name)) + - on-disk size = 12bytes per col_stat per file +- PartitionID: + - base64(hash32(partition name)) + - on-disk size = 12bytes per partition +- FileID: + - base64(hash128(file name)) + - on-disk size = 24bytes per file + +#### Design Choices for ID generation +1. Incremental IDs: Sequentially increasing IDs can be generated in the context of the ongoing commit/write. ID can always start at 1 and to make the full ID unique enough, sequential IDs can be appended with the ongoing commit time. + a. Pros: + ID is simple to generate, doesn't depend on key lookups for resuming the ID generation across writers. + Overall ID can be shorter than Hash based IDs and can still be unique + Differential/delta encoding goes good with sequential numbers and can get high compression ratio (though we didn't see this in the tests) + b. Cons: + Same column can be given several IDs across several commits spilled over several files. Complex merging logic is needed to coalesce them all when looking up for any interested columns. + Doesn't go good with schema evolution. Even without schema evolution, changing IDs for the same column by itself is small schema evolution problem. + +2. Hash IDs: Hashing utilities can be used to generate unique and random IDs of any length for the given column/partition/file name. + a. Pros: + Deterministic Name to ID generation + Reverse lookup of ID to name is possible by relatively much smaller meta index read + ID length can be controlled for the scaling needs + Sharding and locality can be controlled by prefixing with more bits (doable by Incremental IDs also) + b. Cons: + Big scale deployments demand a huge ID space for files there by needing to generate 128 bits hashes + These are usually 32 digit hex chars, taking up at least 32 bytes/ID on disk. However, base64 encoding can help to shave off few bytes and get them to 24 bytes. + Takes up larger space in-memory and on-disk compared to Sequential IDs. Theoretically, the compression ratio should be lesser compared to Sequential IDs. + +Key format in column_stats partition<br/> +- [colId][PartitionId][FileId] +- [colId]+"agg"+[PartitionId] + +First type will be used to store one entry per column per file. And second type will be used to store one aggregated +entry per column per partition. This will be a fixed size key. Lookups don't have to search for ID delimiters as in the +case of incremental IDs. + +These key encodings fit in well to serve our requirements. +Since we are using Hfile as the format, all keys are going to be sorted and hence range read will be very effective for +our use-case as we have chosen the key format consciously having this in mind. + +Given a list of columns and optionally partitions, return a list of matching file names. + +1. We can do prefix search of [ColumnID] or [ColumnID][PartitionID] + - If both columnId and partitionIds are supplied, we will do range read of [colId][partitionId]. + - If list of partitions not available as part of query, we will first look up [colId]+"agg" to do prefix search + for partition level stats. Filter for those partitions which matches the predicates and then follow (1) as in previous line. + +2. Fetch only interested entries for [colId][partitionId] list. +3. Will look up the stats and filter for matching FileIDs +4. Reverse lookup in Files partition to get FileID to FileName mapping. + +Note: +As you could see here, reverse look up of FileId to fileName mapping has to go into "Files" partition to satisfy our requirement. +So, "Files" partition will be added with additional entries of fileId to fileName mappings on the write path. + +#### Sharding: +Any partition in metadata table needs to be instantiated with N file groups/shards upfront. "Files" partition is small and hence +we went with just one file group. But for record level index, we can't go with single file group and had to shard the data. +We will employ some kind of hashing mechanism for key to file group mapping. On the write path, entries will be sharded +and written to different file groups. On the read path, key to be looked up will be hashed to find the right file group +to be looked up. For wild card search, all file groups will be looked up. + +// To be revisited.<br/> +We plan to instantiate the number of file groups in column stats partition based on number of columns being indexed. +We can't estimate the data scale upfront, to which the table might grow eventually and hence have to go with some estimates. +So a rough idea is to instantiate one file group for 10 columns being indexed. Or get some rough input from the user whether +the table will be a small/medium/large scale and determine based on that. + +Similar to how we generate records for files partition in metadata table, we will generate HoodieMetadataRecord +for column stats partition on any commit that gets applied to metadata table. + +### Metadata generation +The existing metadata payload schema will be extended and shared for this new "column_stats" partition also. The type +field will be used to detect the column stats payload record. Here is the schema for the column stats payload record. + +``` + "namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieMetadataRecord", + "doc": "A record saved within the Metadata Table", + "fields": [ + { + "name": "key", + "type": "string" + }, + { + "name": "type", + "doc": "Type of the metadata record", + "type": "int" + }, + { "name": "filesystemMetadata", + . + . + . + }, + { + "name": "ColumnStatsMetadata", + "doc": "Contains information about column statistics for all data files in the table", + "type": [ + "null", + { + "type": "record", + "name": "HoodieColumnStats", + "fields": [ + { + "name": "rangeLow", + "type": [ + "null", + "bytes" + ], + "doc": "Low end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type" + }, + { + "name": "rangeHigh", + "type": [ + "null", + "bytes" + ], + "doc": "High end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type" + }, + { + "name":"total_values", + "type":["long", "null"], + "doc" : "Stores total values for this column in the resepective data file" + }, + { + "name":"total_nulls", + "type":["long", "null"], + "doc" : "Stores total null values for this column in the resepective data file" + }, + { + "name":"total_nans", + "type":["long", "null"], + "doc" : "Stores total Nan values for this column in the resepective data file" + }, + { + "name":"total_size_on_disk", + "type":["long", "null"], + "doc" : "Stores total size occupied by this column on disk corresponding to the resepective data file" + }, + { + "name": "isDeleted", + "type": "boolean", + "doc": "True if this file has been deleted" + } + ] + } + ] + } +``` + +Column stats records hold all stats for the file. The key for the column stat record would be an +encoded string as discussed earlier. + +``` +key = base64_encode(hash64(column name) + hash64(partition name) + hash128(file path)) +key = base64_encode(hash64(column name) + "agg" + hash64(partition name)) +``` + +While Hash based IDs have quite a few desirable properties in the context of Hudi index lookups, there is an impact +on the column level schema changes though. Refer to [Schema Evolution](#Schema-Evolution) section for more details. + +#### Writer flow +Let's walk through the writer flow to update column_stats partition in metadata table. + +1. Files partition - prepare records for adding // just calling out whats required in the context of column_stats + partition. General files partition will be updated as usual to store file listing information. + - FileID => FileName mapping entries + - PartitionID => PartitionName entry, if not already exists + - Since these IDs are hash based IDs, no look up of prior usages is required here. If not, we need to know what was + the last assigned ID and then go about assigning new incremental/sequential IDs, which slows down the performance significantly +2. Column_stats partition - prepare records for adding + - [ColumnID][PartitionID][FileID] => ColumnStat + - [ColumnId]"agg"[PartitionId] => ColumnStat + - This involves reading the base file footers to fetch min max and other stats to populate values for the record. +d. Commit all these records to metadata table. + +We need to ensure we have all sufficient info in WriteStatus/Commit Metadata that gets passed to metadata writer for +every commit. Reading parquet footers and meta is unavoidable, but other than that, we should try to embed all other info +in the WriteStatus. + +### Index integrations with query engines + +#### How to apply query predicates in Hudi? Review comment: This section will have to be revisited. With recent support for z-ordering, we can re-use the data skipping utils we already have in out code base. please wait to review this section in detail. But high level flow will be as follows: if z-ordering is enabled, at the end of z-order clustering, the replace commit will get applied to metadata table. So, we don't need to build any index as such for z-ordering as such. On the query side, we will rely on what column stats partition in metadata gives us. If z-order clustering was complete, column_stats partition will reflect it. If not, it will return the current state of things. So, query side integration may not need any branching on whether z-ordering is enabled or not and can just rely on the column_stats partition. One caveat: But we can't get rid of z-order index completely though right away. If metadata table is not build out yet or has entered an inconsistent state and is not usable, we have to go the existing way of building an index at the end of z-order clustering. Hope this gives a high level idea of how we plan to do query side integration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
