xiarixiaoyao commented on code in PR #7080: URL: https://github.com/apache/hudi/pull/7080#discussion_r1018740543
########## rfc/rfc-64/rfc-64.md: ########## @@ -0,0 +1,509 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# RFC-64: New Hudi Table Spec API for Query Integrations + +## Proposers + +- @codope +- @alexeykudinkin + +## Approvers + +- @xiarixiao +- @danny0405 +- @vinothchandar +- @prasanna +- @xushiyan + +## Status + +JIRA: [HUDI-4141](https://issues.apache.org/jira/browse/HUDI-4141) + +## Abstract + +In this RFC we propose new set of higher-level Table Spec APIs that would allow +us to up-level our current integration model with new Query Engines, enabling +faster turnaround for such integrations. + +## Background + +[WIP] Plan + +- [x] High-level capabilities API will be supporting +- [ ] High-level overview of integration points of the query engines +- [ ] Deep-dive into existing Spark/Flink/Trino integration points + +The figure below shows the read path in Spark and Presto/Trino query engines. At +a high level, integration with these engines require providing of the following capabilities: + +1. **Listing**: this stage requires enumerating of all of the data files w/in a table representing particular +snapshot of its state, that will be subsequently scanned to fetch the date requested by the target query. All query engines +expect output of this stage in the form of balanced out file "splits" that way allowing to even out any skew in +file sizes. This stage is where various _pruning_ techniques such as partition-pruning, file-level column +statistics filtering are applied. + +2. **Reading**: this stage transforms every file-split (identified in a previous stage) into an iterator over +the records persisted in it. This stage is where actual data shaping to suit the needs of particular query takes +place: records are projected into a schema expected by the query, corresponding filters are pushed down to +reduce amount of data fetched from storage, schema evolution is handled, delayed operations are reconciled (merging/deleting) + + + +## Implementation + +[WIP] Plan + +- [x] Components & API pseudo-code +- [x] Diagram show-casing data flow and integration points +- [ ] Example integration walk-through + +With this RFC, we aim to achieve following: + + - Up-level our current integration model with Query Engines, abstracting away Hudi's lower-level components, + instead providing simple and eloquent abstractions providing necessary capabilities (for listing, reading) + + - Make sure Hudi's APIs are high-level enough to be useful in providing programmatic access to the data + for users willing to access it directly + +To achieve that, we propose to introduce two tiers of APIs: + +1. **Mid-level** (engine-friendly): these APIs will be abstracting away Hudi's internals, behind + simple and familiar concepts and components such as file splits, iterators, + statistics, etc. + +2. **High-level** (user-friendly): these APIs will provide programmatic access to Hudi's capabilities + bypassing query engine interfaces (like SQL, Spark DS, etc) + +Following classic [layer-cake architecture](https://cs.uwaterloo.ca/~m2nagapp/courses/CS446/1195/Arch_Design_Activity/Layered.pdf) +would allow us to abstract away complexity of the lower levels components and APIs from +Query Engines (on the *querying* side) as well as from the end users: + + - Mid-level components will internally leverage Hudi's core building blocks (such as +`FileSystemView`, `FileIndex`, `MergedLogRecordScanner`, `FileReader` etc), while exposing APIs +providing capabilities expected by the Query Engines (listed above) + + - High-level components will be stacked on top of mid-level ones and can be used directly by users to +read and write to tables. + +These APIs will be built bottoms-up, initially focusing on the mid-level +components, then higher-level ones. Once the mid-level components are ready, we could +demonstrate their utility by integrating with one of the query engines. + +In the initial phase of this project we will be focusing the effort on the read-side of the integration, +shifting focus to the write-side in the subsequent phase. + +### Components & API + +Open Qs + +1. ~~Some of the models defined here do already exist and might be representing + lower-level components. Do we expose them in a higher level APIs or do we + bifurcate and hide them as internal impl details (for ex, `HoodieFileGroup` + vs `HoodieInternalFileGroup`)?~~ + 1. *We'll be re-using existing models abstracting them behind projected interfaces + exposing only necessary functionality, hiding away complexity of the whole impl* +2. ~~What about Expressions? Wrapping is going to be hard because we need to + analyze expression structure which is not going to be possible if we wrap.~~ + 1. We will need to introduce our own Expression hierarchy supporting a *superset* + of the commonly used expressions. While this requires quite some heavy-lifting + initially this unfortunately is the only option, provided that we need to + have deep introspections into these expression tress (in the analyses we run + w/in Column Stats Index, Partition Pruning, etc) + 2. However, we won't be implementing an Expression Execution Engine: for execution + we will continue to rely on execution engine machinery simply relaying from + our own representation into the engine-specific one +3. Do we need to implement Catalog backend? We’d be fine with just the session + catalog for now. + +#### Models + +We define following models as core building blocks of the new mid-/high-level APIs/ + +**TableId** is used to identify individual table (for ex, w/in catalog) + +```java +class TableId { + final String databaseName; + final String tableName; +} +``` + +**TableMeta** captures basic meta information of the table + +```java +class TableDef { + final TableId tableId; + + // Table's base path + final String basePath; + // Table's latest schema + final InternalSchema schema; + // Table's type (COW, MOR) + final HoodieTableType tableType; + + // ... +} +``` + +##### File Group/Slice + +In Hudi's glossary, following terms could be observed frequently: + + - File Group: collection of snapshots (or versions) of the group of records, where individual snapshot + is called *file slice* + - File Slice: individual snapshot(s) of the group of records + +Physically file slice is usually represented as + +- For COW: a *base file*, representing a single snapshot (version) of the group of records, + co-located in a single columnar-based file) + +- For MOR: a *base file* and corresponding delta-log files. For MOR, unlike COW, *file slice* actually +constitutes collection of snapshots (versions) of the group of records where: + - Base file represents initial snapshot/version of the group of records + - Delta-log files encode updated versions of the records stored in the base file + +```java +// NOTE: This model is the same as an existing one (ref'd here for notational purposes) +class HoodieFileSlice { + // Id of the file-group this slice belongs to + HoodieFileGroupId fileGroupId; + // Base file's path + String baseFilePath; + // Ordered list of the delta-log files + TreeSet<HoodieLogFile> logFilesPaths; + // ... +} +``` + +NOTE: In case of COW set of records within the File Group wouldn't change. + +```java +// NOTE: This model is the same as an existing one (ref'd here for notational purposes) +class HoodieFileGroup { + // Id of the file-group + HoodieFileGroupId id; + // Projection of the timeline of committed actions affecting file-slices in this group + HoodieTimeline timeline; + // File-slices ordered by corresponding instant they've been committed at + TreeMap<HoodieInstant, FileSlice> fileSlices; + // ... +} +``` + +**PartitionDescriptor** describes particular partition within the dataset. + +```java +class PartitionDescriptor { + // Relative partition path within the dataset + String partitionPath; + // List of values for corresponding partition columns (partition values) + List<Object> partitionColumnValues; +} +``` + +**PartitionSnapshot** represents a state of a single table's partition at any +particular instant T, comprised of the file-slices that are the _latest_ ( +current) within their corresponding file-groups at the instant T. + +NOTE: Non-partitioned tables assumed to have 1 partition enclosing the whole +table. + +```java +class PartitionSnapshot { + PartitionDescriptor descriptor; + // (Latest) Instant T as of which partition's state is represented + HoodieInstant latestInstant; + // File-slices current at the instant T + List<HoodieFileSlice> fileSlices; +} +``` + +**PartitionIncrementalSnapshot** represents a state of a single table's +partition at a particular instant T2, while only considering commits that +occurred in the table after T1 (ie within the timeline T1 -> T2) +comprised of the file-slices that are the _latest_ (current) within their +corresponding file-groups at instant T2. + +NOTE: `PartitionSnapshot` is a special case of `PartitionIncrementalSnapshot` where +T1 = 0. + +```java +class PartitionIncrementalSnapshot { + PartitionDescriptor descriptor; + // Timeline starting at T1 and ending at T2, completed actions thereof + // represent state of the partition + HoodieTimeline timeline; + // File-slices current at the instant T_y + List<HoodieFileSlice> fileSlices; +} +``` + +**ReadingMode** represents the query type. + +```java +enum ReadingMode { + // Representing state of the table at a particular instant T + SNAPSHOT, + // (MOR only) Representing state of the table at a particular instant T, + // reading exclusively base-files (ie, ignoring any delta-logs if present) + SNAPSHOT_READ_OPTIMIZED, + // Representing state of the table at a particular instant T_y assuming, + // as an initial instant Tx (snapshot is a specification of this mode, where T_x = 0) + INCREMENTAL, + // Representing state of the table as a stream of CRUD operations on + // individual records + CDC +} +``` + +**Expression** (**Predicate**) + Review Comment: only use for dataSkipping/partition prune ? we donnot think we should use those predicate for engine scan action。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
