vinothchandar commented on code in PR #7235:
URL: https://github.com/apache/hudi/pull/7235#discussion_r1342511107


##########
rfc/rfc-63/rfc-63.md:
##########
@@ -0,0 +1,418 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# RFC-63: Functional Indexes
+
+## Proposers
+
+- @yihua
+- @alexeykudinkin
+- @codope
+
+## Approvers
+
+- @vinothchandar
+- @xushiyan
+- @nsivabalan
+
+## Status
+
+JIRA: [HUDI-512](https://issues.apache.org/jira/browse/HUDI-512)
+
+## Abstract
+
+In this RFC, we propose **Functional Indexes**, a new capability to
+Hudi's [multi-modal 
indexing](https://hudi.apache.org/blog/2022/05/17/Introducing-Multi-Modal-Index-for-the-Lakehouse-in-Apache-Hudi)
+subsystem that offers a compelling vision to support not only accelerating 
queries but also reshape partitions as
+another layer of the indexing system, abstracting them from the traditional 
fixed notion, while providing flexibility
+and performance.
+
+## Background
+
+Hudi employs multi-modal indexing to optimize query performance. These 
indexes, ranging from simple files index to
+record-level indexing, cater to a diverse set of use cases, enabling efficient 
point lookups and reducing the data
+scanned during queries. This is usually done in two ways:
+
+- **Partition pruning**:  The partition pruning relies on a table with 
physical partitioning, such as Hive partitioning.
+  A partitioned table uses a chosen column such as the date of `timestamp` and 
stores the rows with the same date to the
+  files under the same folder or physical partition, such as 
`date=2022-10-01/`. When the predicate in a query
+  references the partition column of the physical partitioning, the files in 
the partitions not matching the predicate
+  are filtered out, without scanning. For example, for the predicate `date 
between '2022-10-01' and '2022-10-02'`, the
+  partition pruning only returns the files from two partitions, `2022-10-01` 
and `2022-10-02`, for further processing.
+  The granularity of the pruning is at the partition level.
+
+
+- **Data Skipping**:  Skipping data at the file level, with the help of column 
stats or record-level index. For example,
+  with column stats index containing minimum and maximum values of a column 
for each file, the files falling out of the
+  range of the values compared to the predicate can be pruned. For a predicate 
with `age < 20`, the file pruning filters
+  out a file with columns stats of `[30, 40]` as the minimum and maximum 
values of the column `age`.
+
+While Hudi already supports partition pruning and data skipping for different 
query engines, we
+recognize that the following use cases need better query performance and 
usability:
+
+- Data skipping based on functions defined on column(s)
+- Support for different storage layouts and view partition as index
+- Support for secondary indexes
+
+Next, we explain these use cases in detail.
+
+### Use Case 1: Data skipping based on functions defined on column(s)
+
+Let's consider a non-partitioned table containing the events with a 
`timestamp` column. The events with naturally
+increasing time are ingested into the table with bulk inserts every hour. In 
this case, assume that each file should
+contain rows for a particular hour:
+
+| File Name           | Min of `timestamp` | Max of `timestamp` | Note         
      |
+|---------------------|--------------------|--------------------|--------------------|
+| base_file_1.parquet | 1664582400         | 1664586000         | 2022-10-01 
12-1 AM |
+| base_file_2.parquet | 1664586000         | 1664589600         | 2022-10-01 
1-2 AM  |
+| ...                 | ...                | ...                | ...          
      |
+| base_file_13.parquet | 1664625600         | 1664629200         | 2022-10-01 
12-1 PM |
+| base_file_14.parquet | 1664629200         | 1664632800         | 2022-10-01 
1-2 PM  |
+| ...                 | ...                | ...                | ...          
      |
+| base_file_37.parquet | 1664712000         | 1664715600         | 2022-10-02 
12-1 PM |
+| base_file_38.parquet | 1664715600         | 1664719200         | 2022-10-02 
1-2 PM  |
+
+For a query to get the number of events between 12PM and 2PM each day in a 
month for time-of-day analysis, the
+predicates look like `DATE_FORMAT(timestamp, '%Y-%m-%d') between '2022-10-01' 
and '2022-10-31'`
+and `DATE_FORMAT(timestamp, '%H') between '12' and '13'`. If the data is in a 
good layout as above, we only need to scan
+two files (instead of 24 files) for each day of data, e.g., 
`base_file_13.parquet` and `base_file_14.parquet` containing
+the data for 2022-10-01 12-2 PM.
+
+Currently, such a fine-grained data skipping based on a function on a column 
cannot be achieved in Hudi, because
+transforming the `timestamp` to the hour of day is not order-preserving, thus 
the file pruning cannot directly leverage
+the file-level column stats of the original column of `timestamp`. In this 
case, Hudi has to scan all the files for a
+day and push the predicate down when reading parquet files, increasing the 
amount of data to be scanned.
+
+### Use Case 2: Support for different storage layouts and view partition as 
index
+
+Today, partitions are mainly viewed as a query optimization technique and 
partition pruning certainly helps to improve 
+query performance. However, if we think about it, partitions are really a 
storage optimization technique. Partitions
+help you organize the data for your convenience, while balancing cloud storage 
scaling issues (e.g. throttling or having
+too many files/objects under one path). From a query optimization perspective, 
partitions are really just a coarse index. 
+We can achieve the same goals as partition pruning with indexes.
+
+In this RFC, we propose how data is partitioned (hive-style, hashed/random 
prefix for cloud throttling) can be decoupled
+from how the data is queried. There can be different layouts:
+
+1. Files are stored under a base path, partitioned hive-style.
+2. Files are stored under random prefixes attached to a base path, still 
hive-style partitioned (RFC-60) e.g. `s3://<
+   random-prefix1>path/to/table/partition1=abc`, 
`s3://<random_prefix2>path/to/table/partition1=xyz`.
+3. Files are stored across different buckets completely scattered on cloud 
storage e.g. `s3://a/b/c/f1`, `s3://x/y/f2`.
+4. Partitions can evolve. For instance, you have an old Hive table which is 
horribly partitioned, can we ensure that the
+   new data is not only partitioned well but queries able to efficiently skip 
data without rewriting the old data with
+   the new partition spec.
+
+Consider a case where event logs are stream from microservices and ingested 
into a raw event table. Each event log
+contains a `timestamp` and an associated organization ID (`org_id`). Most 
queries on the table are organization specific
+and fetch logs for a particular time range. A user may attempt to physically 
partition the data by both `org_id`
+and `date(timestamp)`. If there are 1K organization IDs and one year of data, 
such a physical partitioning scheme writes
+at least `365 days x 1K IDs = 365K` data files under 365K partitions. In most 
cases, the data can be highly skewed based
+on the organizations, with most organizations having less data and a handful 
of organizations having the majority of the
+data, so that there can be many small data files. In such a case, the user may 
want to evolve the partitioning by
+using `org_id` only without rewriting existing data, resulting in the physical 
layout of data like below
+
+| Physical partition path      | File Name            | Min of datestr | Max 
of datestr | Note                    |
+|------------------------------|----------------------|----------------|----------------|-------------------------|
+| org_id=1/datestr=2022-10-01/ | base_file_1.parquet  | `2022-10-01`   | 
`2022-10-01`   | Old partitioning scheme |
+| org_id=1/datestr=2022-10-02/ | base_file_2.parquet  | `2022-10-02`   | 
`2022-10-02`   |                         |
+| org_id=2/datestr=2022-10-01/ | base_file_3.parquet  | `2022-10-01`   | 
`2022-10-01`   |                         |
+| org_id=3/datestr=2022-10-01/ | base_file_4.parquet  | `2022-10-01`   | 
`2022-10-01`   |                         |
+| ...                          | ...                  | ...            | ...   
         | ...                     |
+| org_id=1/                    | base_file_10.parquet | `2022-10-10`   | 
`2022-10-11`   | New partitioning scheme |
+| org_id=2/                    | base_file_11.parquet | `2022-10-10`   | 
`2022-10-15`   |                         |
+| ...                          | ...                  | ...            | ...   
         | ...                     |
+
+For the example above, even with the mix of old and new partitioning scheme, 
we should be able to effectively skip data
+based on the range of `datestr` for each file, regardless how the files are 
stored under different physical partition
+paths in the table.
+
+### Use Case 3: Support for different indexes
+
+Functional index framework should be able to work with different index types 
such as bloom index, column stats, and at
+the same time should be extensible enough to support any other secondary index 
such
+as [vector](https://www.pinecone.io/learn/vector-database/) 
[index]((https://weaviate.io/developers/weaviate/concepts/vector-index))
+in the future. If we think about a very simple index on a column, it is kind 
of a functional index with identity
+function `f(X) = X`. It is important to note that these are secondary indexes 
in the sense they will be stored
+separately from the data, and not materialized with the data.
+
+## Goals and Non-Goals
+
+Based on the use cases we plan to support, we set the following goals and 
non-goals for this RFC.
+
+### Goals
+
+- Modular, easy-to-use indexing subsystem, with full SQL support to manage 
indexes.
+- Absorb partitioning into indexes and aggregate statistics at the storage 
partition level. 
+- Support efficient data skipping with different indexing mechanisms.
+- Be engine-agnostic and language-agnostic.
+
+### Non-Goals
+
+- DO NOT remove physical partitioning, which remains as an option for 
physically storing data in different folders and
+  partition pruning. Viewing partitions as yet another index goes beyond the 
traditional view as pointed in use case 2,
+  and we will see how we can support logical partitioning and partition 
evolution simply with indexes.
+- DO NOT tackle the support of using these indexes on the write path in this 
RFC. That said, we will present a glimpse
+  of how that can be done for functional indexes in the Appendix below.
+- DO NOT build an expression engine. Building an expression engine in Hudi 
that unifies query predicate expressions from
+  systems like Spark, Presto, and others into a standardized format is a 
forward-thinking idea. This would centralize
+  the interpretation of queries for Hudi tables, leading to a simpler 
compatibility with multiple query engines.
+  However, it is not a pre-requisite for the first cut of functional indexes. 
Expression engine should be discussed in
+  another RFC.
+
+## Design and Implementation
+
+At a high level, **Functional Index** design principles are as follows:
+
+1. User specifies the Functional Index, including the original data column, 
the expression applying a function on the
+   column(s), through **SQL** or Hudi's write config. Indexes can be created 
or dropped for Hudi tables at any time.
+2. While table properties will be the source of truth about what indexes are 
available, the metadata about each
+   functional index is registered in a separate index metadata file, to keep 
track of the relationship between the data
+   column and the functional Index. 
+3. Each functional index will be a partition inside the Hudi metadata table 
(MT).
+4. No data is materialized to the data files for the functional index, i.e., 
the data files do not contain a new data
+   column corresponding to the Functional Index, to save storage space.
+5. When query engine makes a logical plan for a query, Hudi intercepts the 
predicate that relates to a functional index,
+   looks for the corresponding index in the MT, and applies data skipping 
based on one or more indexes.
+6. In order to be engine-agnostic, the design should not make assumptions 
about any particular engine.
+
+### Components
+
+We discuss the design and implementation details of each component to realize 
the Functional Index.
+
+#### SQL
+
+A new keyword `FUNCTION` is introduced to indicate that a functional index is 
intended to be created. This keyword is
+ANSI-compliant and also present in 
[spark-sql](https://spark.apache.org/docs/latest/sql-ref-ansi-compliance.html).
+
+```sql
+-- PROPOSED SYNTAX WITH FUNCTION KEYWORD --
+CREATE
+[FUNCTION] INDEX index_name ON table_name [ USING index_type ] ( { column_name 
| expression } );
+-- Examples --
+CREATE FUNCTION INDEX last_name_idx ON employees (UPPER(last_name)); -- 
functional index using column stats for UPPER(last_name)
+CREATE FUNCTION INDEX datestr ON hudi_table (DATE_FORMAT(timestamp, 
'%Y-%m-%d')); -- functional index using column stats for DATE_FORMAT(timestamp, 
'%Y-%m-%d')
+CREATE INDEX city_id_idx ON hudi_table (city_id); -- usual column stats within 
MT column_stats partition
+CREATE FUNCTION INDEX hour_of_day ON hudi_table USING BITMAP 
(DATE_FORMAT(timestamp, '%H')); -- functional index using bitmap for 
DATE_FORMAT(timestamp, '%H')
+CREATE FUNCTION INDEX income_idx ON employees (salary + 
(salary*commission_pct)); --  functional index using column stats for given 
expression
+-- NO CHANGE IN DROP INDEX --
+DROP INDEX last_name_idx;
+```
+
+`index_name` - Required, should be validated by parser. The name will be used 
for the partition name in MT.
+
+`index_type` - Optional, column\_stats if none provided and there are no 
functions and expressions in the command. Valid
+options could be BITMAP, COLUMN\_STATS, LUCENE, etc. If index\_type is not 
provided, and there are functions or
+expressions in the command then a functional index using column stats will be 
created.
+
+`expression` - simple scalar expressions or sql functions.
+
+#### Functional Index Metadata
+
+For each functional index, store a separate metadata with index details. This 
should be efficiently loaded. One option
+is to store the below metadata in `hoodie.properties`. This way all index 
metadata can be loaded with the table config.
+But, it would be better to not overload the table config. Let 
`hoodie.properties` still hold the list of indexes (MT

Review Comment:
   changed to `hoodie.table.index.defs.path` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to