This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new a0bf38eb27d [DOCS] Refine quickstart docs further for 1.0 (#12420)
a0bf38eb27d is described below
commit a0bf38eb27d9e088929af1f1f414448f5e5760b1
Author: Lokesh Jain <[email protected]>
AuthorDate: Sat Dec 7 22:10:17 2024 +0530
[DOCS] Refine quickstart docs further for 1.0 (#12420)
* [DOCS] Refine quickstart docs further for 1.0
* Add merge into partial update in quickstart
* Add merge into partial updates to sql dml
* Add merge mode example
* Add index acceleration related and MDT query related docs
* Address review comments
* Add Flink NBCC example
* Add snapshot query example for event time and custom merge modes
* Add note on checkpoint translation
* Fix caveats
* Revert metadata page changes
* First pass of review comments and improvements on top
* fix typo
* Address review comments
* Address review comments
* Adding custom merger docs
---------
Co-authored-by: Lokesh Jain <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
Co-authored-by: vinoth chandar <[email protected]>
---
website/docs/gcp_bigquery.md | 1 -
website/docs/indexes.md | 7 +-
website/docs/metadata_indexing.md | 52 ++--
website/docs/performance.md | 6 +-
website/docs/quick-start-guide.md | 181 +++++++-----
website/docs/sql_ddl.md | 311 ++++++++++++++-------
website/docs/sql_dml.md | 47 +++-
website/docs/sql_queries.md | 306 ++++++++++++++++++++
.../bloom-filter-expression-index-with-pruning.png | Bin 0 -> 39928 bytes
...oom-filter-expression-index-without-pruning.png | Bin 0 -> 44612 bytes
.../column-stat-expression-index-with-pruning.png | Bin 0 -> 40126 bytes
...olumn-stat-expression-index-without-pruning.png | Bin 0 -> 44958 bytes
.../images/partition-stat-index-with-pruning.png | Bin 0 -> 39043 bytes
.../partition-stat-index-without-pruning.png | Bin 0 -> 44779 bytes
.../assets/images/secondary-index-with-pruning.png | Bin 0 -> 39921 bytes
.../images/secondary-index-without-pruning.png | Bin 0 -> 44916 bytes
16 files changed, 697 insertions(+), 214 deletions(-)
diff --git a/website/docs/gcp_bigquery.md b/website/docs/gcp_bigquery.md
index cd61ec0eef0..9f7b12dbeb3 100644
--- a/website/docs/gcp_bigquery.md
+++ b/website/docs/gcp_bigquery.md
@@ -99,5 +99,4 @@ spark-submit --master yarn \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.write.drop.partition.columns=true \
--hoodie-conf hoodie.partition.metafile.use.base.format=true \
---hoodie-conf hoodie.metadata.enable=true \
```
diff --git a/website/docs/indexes.md b/website/docs/indexes.md
index 1f1c039b97a..512242ba811 100644
--- a/website/docs/indexes.md
+++ b/website/docs/indexes.md
@@ -65,10 +65,9 @@ the [metadata table](/docs/next/metadata#metadata-table),
significantly improvin
### Expression Index
- An [expression
index](https://github.com/apache/hudi/blob/3789840be3d041cbcfc6b24786740210e4e6d6ac/rfc/rfc-63/rfc-63.md)
- is an index on a function of a column. If a query has a predicate on a
function of a column, the functional index can
- be used to speed up the query. Functional index is stored in *func_index_*
prefixed partitions (one for each
- function) under metadata table. Functional index can be created using SQL
syntax. Please checkout SQL DDL
+ An [expression
index](https://github.com/apache/hudi/blob/3789840be3d041cbcfc6b24786740210e4e6d6ac/rfc/rfc-63/rfc-63.md)
is an index on a function of a column. If a query has a predicate on a
function of a column, the expression index can
+ be used to speed up the query. Expression index is stored in *expr_index_*
prefixed partitions (one for each
+ expression index) under metadata table. Expression index can be created
using SQL syntax. Please checkout SQL DDL
docs [here](/docs/next/sql_ddl#create-functional-index-experimental) for
more details.
### Secondary Index
diff --git a/website/docs/metadata_indexing.md
b/website/docs/metadata_indexing.md
index e6578275a69..1d7a6826820 100644
--- a/website/docs/metadata_indexing.md
+++ b/website/docs/metadata_indexing.md
@@ -11,27 +11,27 @@ of Hudi depends on the metadata table. Different types of
index, from `files` in
to `column_stats` index for data skipping, are part of the metadata table. A
fundamental tradeoff in any data system
that supports indices is to balance the write throughput with index updates. A
brute-force way is to lock out the writes
while indexing. Hudi supports index creation using SQL, Datasource as well as
async indexing. However, very large tables
-can take hours to index. This is where Hudi's novel asynchronous metadata
indexing comes into play. Indexes in Hudi are
-created in two phases and uses a mix of optimistic concurrency control and
log-based concurrency control models. The two
-phase approach ensures that the other writers are unblocked.
+can take hours to index. This is where Hudi's novel concurrent indexing comes
into play.
+
+## Concurrent Indexing
-- Scheduling - This is the first phase which schedules an indexing plan and is
protected by a lock. Indexing plan considers all the completed commits upto
indexing instant.
-- Execution - This phase creates the index files as mentioned in the index
plan. At the end of the phase Hudi ensures the completed commits after indexing
instant used already created index plan to add corresponding index metadata.
This check is protected by a metadata table lock and in case of failures
indexing is aborted.
+Indexes in Hudi are created in two phases and uses a mix of optimistic
concurrency control and multi-version concurrency control techniques. The two
+phase approach ensures that the other writers are unblocked.
-We can now create different metadata indices, including `files`,
`bloom_filters`, `column_stats`, `partition_stats` and `record_index`
-asynchronously in Hudi, which are then used by readers and writers to improve
performance. Being able to index without blocking writing
-has two benefits,
+- **Scheduling & Planning** : This is the first phase which schedules an
indexing plan and is protected by a lock. Indexing plan considers all the
completed commits upto indexing instant.
+- **Execution** : This phase creates the index files as mentioned in the index
plan. At the end of the phase Hudi ensures the completed commits after indexing
instant used already created index plan to add corresponding index metadata.
This check is protected by a metadata table lock and in case of failures
indexing is aborted.
-- improved write latency
-- reduced resource wastage due to contention between writing and indexing.
+We can now create different indexes and metadata, including `bloom_filters`,
`column_stats`, `partition_stats`, `record_index`, `secondary_index`
+and `expression_index` asynchronously in Hudi. Being able to index without
blocking writing ensures write performance is unaffected and no
+additional manual maintenance is necessary to add/remove indexes. It also
reduces resource wastage by avoiding contention between writing and indexing.
-In this document, we will learn how to create indexes using SQL, Datasource
and how to setup asynchronous metadata indexing.
-To learn more about the design of asynchronous indexing feature, please check
out [this blog](https://www.onehouse.ai/blog/asynchronous-indexing-using-hudi).
+Please refer section [Setup Async Indexing](#setup-async-indexing) to get more
details on how to setup
+asynchronous indexing. To learn more about the design of asynchronous indexing
feature, please check out [this
blog](https://www.onehouse.ai/blog/asynchronous-indexing-using-hudi).
## Index Creation Using SQL
Currently indexes like secondary index, expression index and record index can
be created using SQL create index command.
-For more information on these indexes please refer [metadata
section](https://hudi.apache.org/docs/metadata/#metadata-table-indices)
+For more information on these indexes please refer [metadata
section](/docs/next/metadata/#types-of-table-metadata)
**Examples**
```sql
@@ -41,14 +41,14 @@ CREATE INDEX record_index ON hudi_indexed_table (uuid);
-- Create secondary index on rider column.
CREATE INDEX idx_rider ON hudi_indexed_table (rider);
--- Create expression index by performing transformation on driver and city
column
+-- Create expression index by performing transformation on ts and driver
column
-- The index is created on the transformed column. Here column stats index is
created on ts column
--- and bloom filters index is created on city column.
-CREATE INDEX idx_column_driver ON hudi_indexed_table USING column_stats(rider)
OPTIONS(expr='upper');
-CREATE INDEX idx_bloom_city ON hudi_indexed_table USING bloom_filters(city)
OPTIONS(expr='identity');
+-- and bloom filters index is created on driver column.
+CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts)
OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd');
+CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING
bloom_filters(driver) OPTIONS(expr='identity');
```
-For more information on index creation using SQL refer [SQL
DDL](https://hudi.apache.org/docs/next/sql_ddl#create-index)
+For more information on index creation using SQL refer [SQL
DDL](/docs/next/sql_ddl#create-index)
## Index Creation Using Datasource
@@ -76,6 +76,10 @@ hoodie.metadata.index.bloom.filter.enable=true
## Setup Async Indexing
+In the example we will have continuous writing using Hudi Streamer and also
create index in parallel. The index creation
+in example is done using HoodieIndexer so that schedule and execute phases are
clearly visible for indexing. The asynchronous
+configurations can be used with Datasource and SQL based configs to create
index as well.
+
First, we will generate a continuous workload. In the below example, we are
going to start a [Hudi
Streamer](/docs/hoodie_streaming_ingestion#hudi-streamer) which will
continuously write data
from raw parquet to Hudi table. We used the widely available [NY Taxi
dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/), whose setup
details are as below:
<details>
@@ -129,7 +133,7 @@ us schedule the indexing for COLUMN_STATS index. First we
need to define a prope
As mentioned before, metadata indices are pluggable. One can add any index at
any point in time depending on changing
business requirements. Some configurations to enable particular indices are
listed below. Currently, available indices under
-metadata table can be explored
[here](/docs/next/metadata#metadata-table-indices) along with
[configs](/docs/next/metadata#enable-hudi-metadata-table-and-multi-modal-index-in-write-side)
+metadata table can be explored
[here](/docs/next/metadata/#types-of-table-metadata) along with
[configs](/docs/next/metadata#enable-hudi-metadata-table-and-multi-modal-index-in-write-side)
to enable them. The full set of metadata configurations can be explored
[here](/docs/next/configurations/#Metadata-Configs).
:::note
@@ -138,8 +142,7 @@ configuration below.
:::
```
-# ensure that both metadata and async indexing is enabled as below two configs
-hoodie.metadata.enable=true
+# ensure that async indexing is enabled
hoodie.metadata.index.async=true
# enable column_stats index config
hoodie.metadata.index.column.stats.enable=true
@@ -248,15 +251,10 @@ spark-submit \
Asynchronous indexing feature is still evolving. Few points to note from
deployment perspective while running the indexer:
-- While an index can be created concurrently with ingestion, it cannot be
dropped concurrently. Please stop all writers
- before dropping an index.
- Files index is created by default as long as the metadata table is enabled.
- Trigger indexing for one metadata partition (or index type) at a time.
-- If an index is enabled via async HoodieIndexer, then ensure that index is
also enabled in configs corresponding to regular ingestion writers. Otherwise,
metadata writer will
+- If an index is enabled via async indexing, then ensure that index is also
enabled in configs corresponding to regular ingestion writers. Otherwise,
metadata writer will
think that particular index was disabled and cleanup the metadata partition.
-- In the case of multi-writers, enable async index and specific index config
for all writers.
-- Unlike other table services like compaction and clustering, where we have a
separate configuration to run inline, there is no such inline config here.
- For example, if async indexing is disabled and metadata is enabled along
with column stats index type, then both files and column stats index will be
created synchronously with ingestion.
Some of these limitations will be removed in the upcoming releases. Please
follow [HUDI-2488](https://issues.apache.org/jira/browse/HUDI-2488) for
developments on this feature.
diff --git a/website/docs/performance.md b/website/docs/performance.md
index 5bb7f935a1f..0663535c07d 100644
--- a/website/docs/performance.md
+++ b/website/docs/performance.md
@@ -128,6 +128,6 @@ If you're planning on enabling Column Stats Index for
already existing table, pl
To enable Data Skipping in your queries make sure to set following properties
to `true` (on the read path):
- - `hoodie.enable.data.skipping` (to enable Data Skipping)
- - `hoodie.metadata.enable` (to enable Metadata Table use on the read path)
- - `hoodie.metadata.index.column.stats.enable` (to enable Column Stats Index
use on the read path)
+ - `hoodie.enable.data.skipping` (to control data skipping, enabled by
default)
+ - `hoodie.metadata.enable` (to enable metadata table use on the read path,
enabled by default)
+ - `hoodie.metadata.index.column.stats.enable` (to enable column stats index
use on the read path)
diff --git a/website/docs/quick-start-guide.md
b/website/docs/quick-start-guide.md
index ebc1c6271f2..f4fec42dc84 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -415,72 +415,6 @@ spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key,
_hoodie_partition_pat
</Tabs
>
-## Index data {#indexing}
-
-<Tabs
-groupId="programming-language"
-defaultValue="sparksql"
-values={[
-{ label: 'Spark SQL', value: 'sparksql', },
-]}
->
-
-<TabItem value="sparksql">
-
-```sql
--- Create a table with primary key
-CREATE TABLE hudi_indexed_table (
- ts BIGINT,
- uuid STRING,
- rider STRING,
- driver STRING,
- fare DOUBLE,
- city STRING
-) USING HUDI
-options(
- primaryKey ='uuid',
- hoodie.datasource.write.payload.class =
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
-)
-PARTITIONED BY (city);
-
-INSERT INTO hudi_indexed_table
-VALUES
-(1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
-(1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco'),
-(1695046462,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco'),
-(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
-(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'
),
-(1695376420,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo' ),
-(1695173887,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai' ),
-(1695115999,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
-
--- Create bloom filter expression index on city column
-CREATE INDEX idx_bloom_city ON hudi_indexed_table USING bloom_filters(city)
OPTIONS(expr='identity');
--- It would show bloom filter expression index
-SHOW INDEXES FROM hudi_indexed_table;
--- Query on city column would prune the data using the idx_bloom_city index
-SELECT uuid, rider FROM hudi_indexed_table WHERE city = 'san_francisco';
-
--- Create column stat expression index on ts column
-CREATE INDEX idx_column_driver ON hudi_indexed_table USING column_stats(rider)
OPTIONS(expr='upper');
--- Shows both expression indexes
-SHOW INDEXES FROM hudi_indexed_table;
--- Query on ts column would prune the data using the idx_column_ts index
-SELECT * FROM hudi_indexed_table WHERE upper(driver) = 'DRIVER-S';
-
--- Create secondary index on rider column
-CREATE INDEX record_index ON hudi_indexed_table (uuid);
-CREATE INDEX idx_rider ON hudi_indexed_table (rider);
--- Expression index and secondary index should show up
-SHOW INDEXES FROM hudi_indexed_table;
--- Query on rider column would leverage the secondary index idx_rider
-SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E';
-```
-</TabItem>
-
-</Tabs
->
-
## Update data {#upserts}
Hudi tables can be updated by streaming in a DataFrame or using a standard
UPDATE statement.
@@ -627,13 +561,28 @@ WHEN NOT MATCHED THEN INSERT *
:::info Key requirements
1. For a Hudi table with user defined primary record [keys](#keys), the join
condition is expected to contain the primary keys of the table.
-For a Hudi table with Hudi generated primary keys, the join condition can be
on any arbitrary data columns.
-2. For Merge-On-Read tables, partial column updates are not yet supported,
i.e. **all columns** need to be SET from a
-MERGE statement either using `SET *` or using `SET column1 = expression1 [,
column2 = expression2 ...]`.
+For a Hudi table with Hudi generated primary keys, the join condition can be
on any arbitrary data columns.
:::
</TabItem>
</Tabs>
+## Merging Data (Partial Updates) {#merge-partial-update}
+
+Partial updates only write updated columns instead of full update record. This
is useful when you have hundreds of
+columns and only a few columns are updated. It reduces the write costs as well
as storage costs.
+`MERGE INTO` statement above can be modified to use partial updates as shown
below.
+
+```sql
+MERGE INTO hudi_table AS target
+USING fare_adjustment AS source
+ON target.uuid = source.uuid
+WHEN MATCHED THEN UPDATE SET fare = source.fare
+WHEN NOT MATCHED THEN INSERT *
+;
+```
+
+Notice, instead of `UPDATE SET *`, we are updating only the `fare` column.
+
## Delete data {#deletes}
Delete operation removes the records specified from the table. For example,
this code snippet deletes records
@@ -711,6 +660,91 @@ Notice that the save mode is again `Append`.
</Tabs
>
+## Index data {#indexing}
+
+<Tabs
+groupId="programming-language"
+defaultValue="sparksql"
+values={[
+{ label: 'Spark SQL', value: 'sparksql', },
+]}
+>
+
+<TabItem value="sparksql">
+
+```sql
+-- Create a table with primary key
+CREATE TABLE hudi_indexed_table (
+ ts BIGINT,
+ uuid STRING,
+ rider STRING,
+ driver STRING,
+ fare DOUBLE,
+ city STRING
+) USING HUDI
+options(
+ primaryKey ='uuid',
+ hoodie.write.record.merge.mode = "COMMIT_TIME_ORDERING"
+)
+PARTITIONED BY (city);
+
+INSERT INTO hudi_indexed_table
+VALUES
+(1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
+(1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco'),
+(1695046462,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco'),
+(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
+(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'
),
+(1695376420,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo' ),
+(1695173887,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai' ),
+(1695115999,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
+
+-- Create bloom filter expression index on driver column
+CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING
bloom_filters(driver) OPTIONS(expr='identity');
+-- It would show bloom filter expression index
+SHOW INDEXES FROM hudi_indexed_table;
+-- Query on driver column would prune the data using the idx_bloom_driver index
+SELECT uuid, rider FROM hudi_indexed_table WHERE driver = 'driver-S';
+
+-- Create column stat expression index on ts column
+CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts)
OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd');
+-- Shows both expression indexes
+SHOW INDEXES FROM hudi_indexed_table;
+-- Query on ts column would prune the data using the idx_column_ts index
+SELECT * FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') =
'2023-09-24';
+
+-- Create secondary index on rider column
+CREATE INDEX record_index ON hudi_indexed_table (uuid);
+CREATE INDEX idx_rider ON hudi_indexed_table (rider);
+SET hoodie.metadata.record.index.enable=true;
+-- Expression index and secondary index should show up
+SHOW INDEXES FROM hudi_indexed_table;
+-- Query on rider column would leverage the secondary index idx_rider
+SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E';
+
+-- Update a record and query the table based on indexed columns
+UPDATE hudi_indexed_table SET rider = 'rider-B', driver = 'driver-N', ts =
'1697516137' WHERE rider = 'rider-A';
+-- Data skipping would be performed using column stat expression index
+SELECT uuid, rider FROM hudi_indexed_table WHERE from_unixtime(ts,
'yyyy-MM-dd') = '2023-10-17';
+-- Data skipping would be performed using bloom filter expression index
+SELECT * FROM hudi_indexed_table WHERE driver = 'driver-N';
+-- Data skipping would be performed using secondary index
+SELECT * FROM hudi_indexed_table WHERE rider = 'rider-B';
+
+-- Drop all the indexes
+DROP INDEX record_index on hudi_indexed_table;
+DROP INDEX secondary_index_idx_rider on hudi_indexed_table;
+DROP INDEX expr_index_idx_bloom_driver on hudi_indexed_table;
+DROP INDEX expr_index_idx_column_ts on hudi_indexed_table;
+-- No indexes should show up for the table
+SHOW INDEXES FROM hudi_indexed_table;
+
+SET hoodie.metadata.record.index.enable=false;
+```
+</TabItem>
+
+</Tabs
+>
## Time Travel Query {#timetravel}
@@ -1138,13 +1172,16 @@ Also if a record key is configured, then it's also
advisable to specify a precom
multiple records with the same key. See section below.
:::
-## Ordering Field
+## Merge Modes
Hudi also allows users to specify a _precombine_ field, which will be used to
order and resolve conflicts between multiple versions of the same record. This
is very important for
-use-cases like applying database CDC logs to a Hudi table, where a given
record may be appear multiple times in the source data due to repeated upstream
updates.
+use-cases like applying database CDC logs to a Hudi table, where a given
record may appear multiple times in the source data due to repeated upstream
updates.
Hudi also uses this mechanism to support out-of-order data arrival into a
table, where records may need to be resolved in a different order than their
commit time.
-For e.g using a _created_at_ timestamp field as the precombine field will
prevent older versions of a record from overwriting newer ones or being exposed
to queries, even
+For e.g. using a _created_at_ timestamp field as the precombine field will
prevent older versions of a record from overwriting newer ones or being exposed
to queries, even
if they are written at a later commit time to the table. This is one of the
key features, that makes Hudi, best suited for dealing with streaming data.
+To enable different merge semantics, Hudi supports [merge
modes](/docs/next/record_merger). Commit time and event time based merge modes
are supported out of the box.
+Users can also define their own custom merge strategies, see
[here](/docs/next/sql_ddl#create-table-with-record-merge-mode).
+
<Tabs
groupId="programming-language"
defaultValue="scala"
diff --git a/website/docs/sql_ddl.md b/website/docs/sql_ddl.md
index 30f21e61bff..a809e37944f 100644
--- a/website/docs/sql_ddl.md
+++ b/website/docs/sql_ddl.md
@@ -103,6 +103,49 @@ TBLPROPERTIES (
);
```
+### Create table with merge modes {#create-table-with-record-merge-mode}
+
+Hudi supports different [record merge modes](/docs/next/record_merger) to
handle merge of incoming records with existing
+records. To create a table with specific record merge mode, you can set
`recordMergeMode` option.
+
+```sql
+CREATE TABLE IF NOT EXISTS hudi_table_merge_mode (
+ id INT,
+ name STRING,
+ ts LONG,
+ price DOUBLE
+) USING hudi
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ precombineField = 'ts',
+ recordMergeMode = 'EVENT_TIME_ORDERING'
+)
+LOCATION 'file:///tmp/hudi_table_merge_mode/';
+```
+
+With `EVENT_TIME_ORDERING`, the record with the larger event time
(`precombineField`) overwrites the record with the
+smaller event time on the same key, regardless of transaction's commit time.
Users can set `CUSTOM` mode to provide their own
+merge logic. With `CUSTOM` merge mode, you can provide a custom class that
implements the merge logic. The interfaces
+to implement is explained in detail [here](/docs/next/record_merger#custom).
+
+```sql
+CREATE TABLE IF NOT EXISTS hudi_table_merge_mode_custom (
+ id INT,
+ name STRING,
+ ts LONG,
+ price DOUBLE
+) USING hudi
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ precombineField = 'ts',
+ recordMergeMode = 'CUSTOM',
+ 'hoodie.record.merge.strategy.id' = '<unique-uuid>'
+)
+LOCATION 'file:///tmp/hudi_table_merge_mode_custom/';
+```
+
### Create table from an external location
Often, Hudi tables are created from streaming writers like the [streamer
tool](/docs/hoodie_streaming_ingestion#hudi-streamer), which
may later need some SQL statements to run on them. You can create an External
table using the `location` statement.
@@ -192,12 +235,17 @@ AS SELECT * FROM parquet_table;
### Create Index
-Hudi supports creating and dropping indexes, including functional indexes, on
a table. For more information on different
-type of indexes please refer [metadata
section](https://hudi.apache.org/docs/metadata/#metadata-table-indices).
Secondary
-index, expression index and record index can be created using SQL create index
command.
+Hudi supports creating and dropping different types of indexes on a table. For
more information on different
+type of indexes please refer [multi-modal
indexing](/docs/next/indexes#multi-modal-indexing). Secondary
+index, expression index and record indexes can be created using SQL create
index command.
-**Syntax**
```sql
+-- Create Index
+CREATE INDEX [IF NOT EXISTS] index_name ON [TABLE] table_name
+[USING index_type]
+(column_name1 [OPTIONS(key1=value1, key2=value2, ...)], column_name2
[OPTIONS(key1=value1, key2=value2, ...)], ...)
+[OPTIONS (key1=value1, key2=value2, ...)]
+
-- Record index syntax
CREATE INDEX indexName ON tableIdentifier (primaryKey1 [, primayKey2 ...]);
@@ -207,8 +255,19 @@ CREATE INDEX indexName ON tableIdentifier (nonPrimaryKey);
-- Expression Index Syntax
CREATE INDEX indexName ON tableIdentifier USING column_stats(col)
OPTIONS(expr='expr_val', format='format_val');
CREATE INDEX indexName ON tableIdentifier USING bloom_filters(col)
OPTIONS(expr='expr_val');
+
+-- Drop Index
+DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
```
+- `index_name` is the name of the index to be created or dropped.
+- `table_name` is the name of the table on which the index is created or
dropped.
+- `index_type` is the type of the index to be created. Currently, only
`column_stats` and `bloom_filters` is supported.
+ If the `using ..` clause is omitted, a secondary record index is created.
+- `column_name` is the name of the column on which the index is created.
+
+Both index and column on which the index is created can be qualified with some
options in the form of key-value pairs.
+
**Examples**
```sql
-- Create a table with primary key
@@ -222,90 +281,65 @@ CREATE TABLE hudi_indexed_table (
) USING HUDI
options(
primaryKey ='uuid',
- hoodie.datasource.write.payload.class =
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload"
+ hoodie.write.record.merge.mode = "COMMIT_TIME_ORDERING"
)
PARTITIONED BY (city);
+-- Add some data.
INSERT INTO hudi_indexed_table
VALUES
-(1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
-(1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco'),
-(1695046462,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco'),
-(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
-(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'
),
-(1695376420,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo' ),
-(1695173887,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai' ),
-(1695115999,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
-
--- Create bloom filter expression index on city column
-CREATE INDEX idx_bloom_city ON hudi_indexed_table USING bloom_filters(city)
OPTIONS(expr='identity');
+ ...
+
+-- Create bloom filter expression index on driver column
+CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING
bloom_filters(driver) OPTIONS(expr='identity');
-- It would show bloom filter expression index
SHOW INDEXES FROM hudi_indexed_table;
--- Query on city column would prune the data using the idx_bloom_city index
-SELECT uuid, rider FROM hudi_indexed_table WHERE city = 'san_francisco';
+-- Query on driver column would prune the data using the idx_bloom_driver index
+SELECT uuid, rider FROM hudi_indexed_table WHERE driver = 'driver-S';
-- Create column stat expression index on ts column
-CREATE INDEX idx_column_driver ON hudi_indexed_table USING column_stats(rider)
OPTIONS(expr='upper');
+CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts)
OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd');
-- Shows both expression indexes
SHOW INDEXES FROM hudi_indexed_table;
-- Query on ts column would prune the data using the idx_column_ts index
-SELECT * FROM hudi_indexed_table WHERE upper(driver) = 'DRIVER-S';
+SELECT * FROM hudi_indexed_table WHERE from_unixtime(ts, 'yyyy-MM-dd') =
'2023-09-24';
-- Create secondary index on rider column
CREATE INDEX record_index ON hudi_indexed_table (uuid);
CREATE INDEX idx_rider ON hudi_indexed_table (rider);
+SET hoodie.metadata.record.index.enable=true;
-- Expression index and secondary index should show up
SHOW INDEXES FROM hudi_indexed_table;
-- Query on rider column would leverage the secondary index idx_rider
SELECT * FROM hudi_indexed_table WHERE rider = 'rider-E';
-```
-
-:::note
-Creating indexes through SQL is in preview in version 1.0.0-beta only. It will
be generally available in version 1.0.0.
-Please report any issues you find either via [GitHub
issues](https://github.com/apache/hudi/issues) or creating a
[JIRA](https://issues.apache.org/jira/projects/HUDI/issues).
-:::
-
-```sql
--- Create Index
-CREATE INDEX [IF NOT EXISTS] index_name ON [TABLE] table_name
-[USING index_type]
-(column_name1 [OPTIONS(key1=value1, key2=value2, ...)], column_name2
[OPTIONS(key1=value1, key2=value2, ...)], ...)
-[OPTIONS (key1=value1, key2=value2, ...)]
--- Drop Index
-DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
```
-- `index_name` is the name of the index to be created or dropped.
-- `table_name` is the name of the table on which the index is created or
dropped.
-- `index_type` is the type of the index to be created. Currently, only
`files`, `column_stats` and `bloom_filters` is supported.
-- `column_name` is the name of the column on which the index is created.
-- Both index and column on which the index is created can be qualified with
some options in the form of key-value pairs.
- We will see this with an example of functional index below.
+### Create Expression Index
-:::note
-Except for the `files`, `column_stats`, `bloom_filters` and `record_index`,
all other indexes are experimental. We
-encourage users to try out these features on new tables and provide feedback.
Below, we have also listed current
-limitations of these indexes.
-:::
-
-#### Create Functional Index (Experimental)
-
-A [functional
index](https://github.com/apache/hudi/blob/00ece7bce0a4a8d0019721a28049723821e01842/rfc/rfc-63/rfc-63.md)
-is an index on a function of a column. It is a new addition to Hudi's
[multi-modal
indexing](https://hudi.apache.org/blog/2022/05/17/Introducing-Multi-Modal-Index-for-the-Lakehouse-in-Apache-Hudi)
-subsystem which provides faster access method and also absorb partitioning as
part of the indexing system. Let us see
-some examples of creating a functional index.
+A [expression
index](https://github.com/apache/hudi/blob/00ece7bce0a4a8d0019721a28049723821e01842/rfc/rfc-63/rfc-63.md)
is an index on a function of a column.
+It is a new addition to Hudi's [multi-modal
indexing](https://hudi.apache.org/blog/2022/05/17/Introducing-Multi-Modal-Index-for-the-Lakehouse-in-Apache-Hudi)
+subsystem. Expression indexes can be used to implement logical partitioning of
a table, by creating `column_stats` indexes
+on an expression of a column. For e.g. an expression index extracting a date
from a timestamp field, can effectively implement
+date based partitioning, provide same benefits to queries, even if the
physical layout is different.
```sql
--- Create a functional index on the column `ts` (unix epoch) of the table
`hudi_table` using the function `from_unixtime` with the format `yyyy-MM-dd`
-CREATE INDEX IF NOT EXISTS ts_datestr ON hudi_table USING column_stats(ts)
OPTIONS(func='from_unixtime', format='yyyy-MM-dd');
--- Create a functional index on the column `ts` (timestamp in yyyy-MM-dd
HH:mm:ss) of the table `hudi_table` using the function `hour`
-CREATE INDEX ts_hour ON hudi_table USING column_stats(ts) options(func='hour');
+-- Create an expression index on the column `ts` (unix epoch) of the table
`hudi_table` using the function `from_unixtime` with the format `yyyy-MM-dd`
+CREATE INDEX IF NOT EXISTS ts_datestr ON hudi_table
+ USING column_stats(ts)
+ OPTIONS(expr='from_unixtime', format='yyyy-MM-dd');
+-- Create a expression index on the column `ts` (timestamp in yyyy-MM-dd
HH:mm:ss) of the table `hudi_table` using the function `hour`
+CREATE INDEX ts_hour ON hudi_table
+ USING column_stats(ts)
+ options(expr='hour');
```
-Few things to note:
-- The `func` option is required for creating functional index, and it should
be a valid Spark SQL function. Currently,
- only the functions that take a single column as input are supported. Some
useful functions that are supported are listed below.
+The `expr` option is required for creating expression index, and it should be
a valid Spark SQL function. Please check the syntax
+for the above functions in the [Spark SQL
documentation](https://spark.apache.org/docs/latest/sql-ref-functions.html) and
provide the options accordingly. For example,
+the `format` option is required for `from_unixtime` function.
+
+Some useful functions that are supported are listed below.
+
- `identity`
- `from_unixtime`
- `date_format`
@@ -322,17 +356,14 @@ Few things to note:
- `regexp_replace`
- `concat`
- `length`
-- Please check the syntax for the above functions in
- the [Spark SQL
documentation](https://spark.apache.org/docs/latest/sql-ref-functions.html) and
provide the options
- accordingly. For example, the `format` option is required for
`from_unixtime` function.
-- UDFs are not supported.
+
+Note that, only functions that take a single column as input are supported
currently and UDFs are not supported.
<details>
- <summary>Example of creating and using functional index</summary>
+ <summary>Full example of creating and using expression index</summary>
```sql
--- create a Hudi table
-CREATE TABLE hudi_table_func_index (
+CREATE TABLE hudi_table_expr_index (
ts STRING,
uuid STRING,
rider STRING,
@@ -342,67 +373,67 @@ CREATE TABLE hudi_table_func_index (
) USING HUDI
tblproperties (primaryKey = 'uuid')
PARTITIONED BY (city)
-location 'file:///tmp/hudi_table_func_index';
-
--- disable small file handling so the each insert creates new file --
-set hoodie.parquet.small.file.limit=0;
-
-INSERT INTO hudi_table_func_index VALUES ('2023-09-20
03:58:59','334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco');
-INSERT INTO hudi_table_func_index VALUES ('2023-09-19
08:46:34','e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco');
-INSERT INTO hudi_table_func_index VALUES ('2023-09-18
17:45:31','9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco');
-INSERT INTO hudi_table_func_index VALUES ('2023-09-22
13:12:56','1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco');
-INSERT INTO hudi_table_func_index VALUES ('2023-09-24
06:15:45','e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo');
-INSERT INTO hudi_table_func_index VALUES ('2023-09-22
15:21:36','7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo');
-INSERT INTO hudi_table_func_index VALUES ('2023-09-20
12:35:45','3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai');
-INSERT INTO hudi_table_func_index VALUES ('2023-09-19
05:34:56','c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
-
--- Query with hour function filter but no idex yet --
-spark-sql> SELECT city, fare, rider, driver FROM hudi_table_func_index WHERE
city NOT IN ('chennai') AND hour(ts) > 12;
+location 'file:///tmp/hudi_table_expr_index';
+
+-- Query with hour function filter but no index yet --
+spark-sql> SELECT city, fare, rider, driver FROM hudi_table_expr_index WHERE
city NOT IN ('chennai') AND hour(ts) > 12;
san_francisco 93.5 rider-E driver-O
san_francisco 33.9 rider-D driver-L
sao_paulo 43.4 rider-G driver-Q
Time taken: 0.208 seconds, Fetched 3 row(s)
-spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM
hudi_table_func_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
+spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM
hudi_table_expr_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
== Optimized Logical Plan ==
Project [city#3465, fare#3464, rider#3462, driver#3463],
Statistics(sizeInBytes=899.5 KiB)
+- Filter ((isnotnull(city#3465) AND isnotnull(ts#3460)) AND (NOT (city#3465 =
chennai) AND (hour(cast(ts#3460 as timestamp), Some(Asia/Kolkata)) > 12))),
Statistics(sizeInBytes=2.5 MiB)
- +- Relation
default.hudi_table_func_index[_hoodie_commit_time#3455,_hoodie_commit_seqno#3456,_hoodie_record_key#3457,_hoodie_partition_path#3458,_hoodie_file_name#3459,ts#3460,uuid#3461,rider#3462,driver#3463,fare#3464,city#3465]
parquet, Statistics(sizeInBytes=2.5 MiB)
+ +- Relation
default.hudi_table_expr_index[_hoodie_commit_time#3455,_hoodie_commit_seqno#3456,_hoodie_record_key#3457,_hoodie_partition_path#3458,_hoodie_file_name#3459,ts#3460,uuid#3461,rider#3462,driver#3463,fare#3464,city#3465]
parquet, Statistics(sizeInBytes=2.5 MiB)
== Physical Plan ==
*(1) Project [city#3465, fare#3464, rider#3462, driver#3463]
+- *(1) Filter (isnotnull(ts#3460) AND (hour(cast(ts#3460 as timestamp),
Some(Asia/Kolkata)) > 12))
+- *(1) ColumnarToRow
- +- FileScan parquet
default.hudi_table_func_index[ts#3460,rider#3462,driver#3463,fare#3464,city#3465]
Batched: true, DataFilters: [isnotnull(ts#3460), (hour(cast(ts#3460 as
timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location:
HoodieFileIndex(1 paths)[file:/tmp/hudi_table_func_index], PartitionFilters:
[isnotnull(city#3465), NOT (city#3465 = chennai)], PushedFilters:
[IsNotNull(ts)], ReadSchema:
struct<ts:string,rider:string,driver:string,fare:double>
+ +- FileScan parquet
default.hudi_table_expr_index[ts#3460,rider#3462,driver#3463,fare#3464,city#3465]
Batched: true, DataFilters: [isnotnull(ts#3460), (hour(cast(ts#3460 as
timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location:
HoodieFileIndex(1 paths)[file:/tmp/hudi_table_expr_index], PartitionFilters:
[isnotnull(city#3465), NOT (city#3465 = chennai)], PushedFilters:
[IsNotNull(ts)], ReadSchema:
struct<ts:string,rider:string,driver:string,fare:double>
--- create the functional index --
-CREATE INDEX ts_hour ON hudi_table_func_index USING column_stats(ts)
options(func='hour');
+-- create the expression index --
+CREATE INDEX ts_hour ON hudi_table_expr_index USING column_stats(ts)
options(expr='hour');
-- query after creating the index --
-spark-sql> SELECT city, fare, rider, driver FROM hudi_table_func_index WHERE
city NOT IN ('chennai') AND hour(ts) > 12;
+spark-sql> SELECT city, fare, rider, driver FROM hudi_table_expr_index WHERE
city NOT IN ('chennai') AND hour(ts) > 12;
san_francisco 93.5 rider-E driver-O
san_francisco 33.9 rider-D driver-L
sao_paulo 43.4 rider-G driver-Q
Time taken: 0.202 seconds, Fetched 3 row(s)
-spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM
hudi_table_func_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
+spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM
hudi_table_expr_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
== Optimized Logical Plan ==
Project [city#2970, fare#2969, rider#2967, driver#2968],
Statistics(sizeInBytes=449.8 KiB)
+- Filter ((isnotnull(city#2970) AND isnotnull(ts#2965)) AND (NOT (city#2970 =
chennai) AND (hour(cast(ts#2965 as timestamp), Some(Asia/Kolkata)) > 12))),
Statistics(sizeInBytes=1278.3 KiB)
- +- Relation
default.hudi_table_func_index[_hoodie_commit_time#2960,_hoodie_commit_seqno#2961,_hoodie_record_key#2962,_hoodie_partition_path#2963,_hoodie_file_name#2964,ts#2965,uuid#2966,rider#2967,driver#2968,fare#2969,city#2970]
parquet, Statistics(sizeInBytes=1278.3 KiB)
+ +- Relation
default.hudi_table_expr_index[_hoodie_commit_time#2960,_hoodie_commit_seqno#2961,_hoodie_record_key#2962,_hoodie_partition_path#2963,_hoodie_file_name#2964,ts#2965,uuid#2966,rider#2967,driver#2968,fare#2969,city#2970]
parquet, Statistics(sizeInBytes=1278.3 KiB)
== Physical Plan ==
*(1) Project [city#2970, fare#2969, rider#2967, driver#2968]
+- *(1) Filter (isnotnull(ts#2965) AND (hour(cast(ts#2965 as timestamp),
Some(Asia/Kolkata)) > 12))
+- *(1) ColumnarToRow
- +- FileScan parquet
default.hudi_table_func_index[ts#2965,rider#2967,driver#2968,fare#2969,city#2970]
Batched: true, DataFilters: [isnotnull(ts#2965), (hour(cast(ts#2965 as
timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location:
HoodieFileIndex(1 paths)[file:/tmp/hudi_table_func_index], PartitionFilters:
[isnotnull(city#2970), NOT (city#2970 = chennai)], PushedFilters:
[IsNotNull(ts)], ReadSchema:
struct<ts:string,rider:string,driver:string,fare:double>
+ +- FileScan parquet
default.hudi_table_expr_index[ts#2965,rider#2967,driver#2968,fare#2969,city#2970]
Batched: true, DataFilters: [isnotnull(ts#2965), (hour(cast(ts#2965 as
timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location:
HoodieFileIndex(1 paths)[file:/tmp/hudi_table_expr_index], PartitionFilters:
[isnotnull(city#2970), NOT (city#2970 = chennai)], PushedFilters:
[IsNotNull(ts)], ReadSchema:
struct<ts:string,rider:string,driver:string,fare:double>
```
</details>
-#### Create Partition Stats and Secondary Index (Experimental)
+### Create Partition Stats Index
+
+Partition stats index is similar to column stats, in the sense that it tracks
- `min, max, null, count, ..` statistics on columns in the
+table, useful in query planning. The key difference being, while
`column_stats` tracks statistics about files, the partition_stats index
+tracks aggregated statistics at the storage partition path level, to help more
efficiently skip entire folder paths during query planning
+and execution.
+
+To enable partition stats index, simply set
`hoodie.metadata.index.partition.stats.enable = 'true'` in create table options.
-Hudi supports various [indexes](/docs/next/metadata#metadata-table-indices).
Let us see how we can use them in the following example.
+### Create Secondary Index
+
+Secondary indexes are record level indexes built on any column in the table.
It supports multiple records having the same
+secondary column value efficiently and is built on top of the existing record
level index built on the table's record key.
+Secondary indexes are hash based indexes that offer horizontally scalable
write performance by splitting key space into shards
+by hashing, as well as fast lookups by employing row-based file formats.
```sql
DROP TABLE IF EXISTS hudi_table;
@@ -420,7 +451,9 @@ CREATE TABLE hudi_table (
primaryKey ='id',
hoodie.metadata.record.index.enable = 'true', -- enable record index
hoodie.metadata.index.partition.stats.enable = 'true', -- enable partition
stats index
- hoodie.metadata.index.column.stats.column.list = 'rider' -- create
partition stats index on rider column
+ hoodie.metadata.index.column.stats.enable = 'true', -- enable column stats
+ hoodie.metadata.index.column.stats.column.list = 'rider', -- create column
stats index on rider column
+ hoodie.write.record.merge.mode = "COMMIT_TIME_ORDERING" -- enable commit
time ordering, required for secondary index
)
PARTITIONED BY (city, state)
LOCATION 'file:///tmp/hudi_test_table';
@@ -429,10 +462,6 @@ INSERT INTO hudi_table VALUES
(1695159649,'trip1','rider-A','driver-K',19.10,'sa
INSERT INTO hudi_table VALUES
(1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california');
INSERT INTO hudi_table VALUES
(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas');
INSERT INTO hudi_table VALUES
(1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas');
-
--- Enable data skipping for the reader
-set hoodie.metadata.enable=true;
-set hoodie.enable.data.skipping=true;
-- simple partition predicate --
select * from hudi_table where city = 'sunnyvale';
@@ -457,7 +486,7 @@ trip1 rider-A driver-K
Time taken: 0.368 seconds, Fetched 1 row(s)
-- create secondary index on driver --
-CREATE INDEX driver_idx ON hudi_table USING secondary_index(driver);
+CREATE INDEX driver_idx ON hudi_table (driver);
-- secondary key predicate --
SELECT id, driver, city, state FROM hudi_table WHERE driver IN ('driver-K',
'driver-M');
@@ -466,19 +495,29 @@ trip2 driver-M sunnyvale california
Time taken: 0.83 seconds, Fetched 2 row(s)
```
-**Limitations of using these indexes:**
+### Create Bloom Filter Index
+
+Bloom filter indexes store a bloom filter per file, on the column or column
expression being index. It can be very
+effective in skipping files that don't contain a high cardinality column value
e.g. uuids.
+
+```sql
+CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING
bloom_filters(driver) OPTIONS(expr='identity');
+CREATE INDEX idx_bloom_rider ON hudi_indexed_table USING bloom_filters(rider)
OPTIONS(expr='lower');
+```
+
+
+### Limitations
- Unlike column stats, partition stats index is not created automatically for
all columns. Users must specify list of
columns for which they want to create partition stats index.
- Predicate on internal meta fields such as `_hoodie_record_key` or
`_hoodie_partition_path` cannot be used for data
skipping. Queries with such predicates cannot leverage the indexes.
- Secondary index is not supported for nested fields.
+- Secondary index can be created only if record index is available in the table
+- Secondary index can only be used for tables using
OverwriteWithLatestAvroPayload payload or COMMIT_TIME_ORDERING merge mode
+- Column stats Expression Index can not be created using `identity` expression
with SQL. Users can leverage column stat index using Datasource instead.
- Index update can fail with schema evolution.
-- If there are multiple indexes present, then secondary index and functional
index update can fail.
- Only one index can be created at a time using [async
indexer](/docs/next/metadata_indexing).
-- Ensure native HFile reader is disabled (`_hoodie.hfile.use.native.reader`)
to leverage the secondary index. Default value for this config is `false`.
-
-Limitations will be addressed before 1.0.0 is made generally available.
### Setting Hudi configs
@@ -790,6 +829,68 @@ WITH (
);
```
+### Create Table in Non-Blocking Concurrency Control Mode
+
+The following is an example of creating a Flink table in [Non-Blocking
Concurrency Control
mode](/docs/next/concurrency_control#non-blocking-concurrency-control).
+
+```sql
+-- This is a datagen source that can generate records continuously
+CREATE TABLE sourceT (
+ uuid VARCHAR(20),
+ name VARCHAR(10),
+ age INT,
+ ts TIMESTAMP(3),
+ `partition` AS 'par1'
+) WITH (
+ 'connector' = 'datagen',
+ 'rows-per-second' = '200'
+);
+
+-- pipeline1: by default, enable the compaction and cleaning services
+CREATE TABLE t1 (
+ uuid VARCHAR(20),
+ name VARCHAR(10),
+ age INT,
+ ts TIMESTAMP(3),
+ `partition` VARCHAR(20)
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = '/tmp/hudi-demo/t1',
+ 'table.type' = 'MERGE_ON_READ',
+ 'index.type' = 'BUCKET',
+ 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
+ 'write.tasks' = '2'
+);
+
+-- pipeline2: disable the compaction and cleaning services manually
+CREATE TABLE t1_2 (
+ uuid VARCHAR(20),
+ name VARCHAR(10),
+ age INT,
+ ts TIMESTAMP(3),
+ `partition` VARCHAR(20)
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = '/tmp/hudi-demo/t1',
+ 'table.type' = 'MERGE_ON_READ',
+ 'index.type' = 'BUCKET',
+ 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
+ 'write.tasks' = '2',
+ 'compaction.schedule.enabled' = 'false',
+ 'compaction.async.enabled' = 'false',
+ 'clean.async.enabled' = 'false'
+);
+
+-- Submit the pipelines
+INSERT INTO t1
+SELECT * FROM sourceT;
+
+INSERT INTO t1_2
+SELECT * FROM sourceT;
+
+SELECT * FROM t1 LIMIT 20;
+```
+
### Alter Table
```sql
ALTER TABLE tableA RENAME TO tableB;
diff --git a/website/docs/sql_dml.md b/website/docs/sql_dml.md
index 04590765f3f..43d5d940fb3 100644
--- a/website/docs/sql_dml.md
+++ b/website/docs/sql_dml.md
@@ -167,6 +167,51 @@ For a Hudi table with user configured primary keys, the
join condition in `Merge
For a Table where Hudi auto generates primary keys, the join condition in MIT
can be on any arbitrary data columns.
:::
+### Merge Into with Partial Updates {#merge-into-partial-update}
+
+Partial updates only write updated columns instead of full change records.
This is useful when you have wide tables (typical for ML feature stores)
+with hundreds of columns and only a few columns are updated. It reduces the
write amplification as well as helps in lowering the query
+latency. `MERGE INTO` statement above can be modified to use partial updates
as shown below.
+
+```sql
+-- Create a Merge-on-Read table
+CREATE TABLE tableName (
+ id INT,
+ name STRING,
+ price DOUBLE,
+ _ts LONG,
+ description STRING
+) USING hudi
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ preCombineField = '_ts'
+)
+LOCATION '/location/to/basePath';
+
+-- Insert values into the table
+INSERT INTO tableName VALUES
+ (1, 'a1', 10, 1000, 'a1: desc1'),
+ (2, 'a2', 20, 1200, 'a2: desc2'),
+ (3, 'a3', 30, 1250, 'a3: desc3');
+
+-- Perform partial updates using a MERGE INTO statement
+MERGE INTO tableName t0
+ USING (
+ SELECT 1 AS id, 'a1' AS name, 12 AS price, 1001 AS ts
+ UNION ALL
+ SELECT 3 AS id, 'a3' AS name, 25 AS price, 1260 AS ts
+ ) s0
+ ON t0.id = s0.id
+ WHEN MATCHED THEN UPDATE SET
+ price = s0.price,
+ _ts = s0.ts;
+
+SELECT id, name, price, _ts, description FROM tableName;
+```
+
+Notice, instead of `UPDATE SET *`, we are updating only the `price` and `_ts`
columns.
+
### Delete From
You can remove data from a Hudi table using the `DELETE FROM` statement.
@@ -194,9 +239,7 @@ DML operations can be sped up using column statistics for
data skipping and usin
For e.g. the following helps speed up the `DELETE` operation on a Hudi table,
by using the record level index.
```sql
-SET hoodie.enable.data.skipping=true;
SET hoodie.metadata.record.index.enable=true;
-SET hoodie.metadata.enable=true;
DELETE from hudi_table where uuid = 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa';
```
diff --git a/website/docs/sql_queries.md b/website/docs/sql_queries.md
index 350a372bf35..c03edb77cfd 100644
--- a/website/docs/sql_queries.md
+++ b/website/docs/sql_queries.md
@@ -38,6 +38,225 @@ using path filters. We expect that native integration with
Spark's optimized tab
management will yield great performance benefits in those versions.
:::
+### Snapshot Query without Index Acceleration
+
+In this section we would go over the various indexes and how they help in data
skipping in Hudi. We will first create
+a hudi table without any index.
+
+```sql
+-- Create a table with primary key
+CREATE TABLE hudi_indexed_table (
+ ts BIGINT,
+ uuid STRING,
+ rider STRING,
+ driver STRING,
+ fare DOUBLE,
+ city STRING
+) USING HUDI
+options(
+ primaryKey ='uuid',
+ hoodie.write.record.merge.mode = "COMMIT_TIME_ORDERING"
+)
+PARTITIONED BY (city);
+
+INSERT INTO hudi_indexed_table
+VALUES
+(1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
+(1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco'),
+(1695046462,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco'),
+(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
+(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'
),
+(1695376420,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo' ),
+(1695173887,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai' ),
+(1695115999,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
+UPDATE hudi_indexed_table SET rider = 'rider-B', driver = 'driver-N', ts =
'1697516137' WHERE rider = 'rider-A';
+```
+
+With the query run below, we will see no data skipping or pruning since there
is no index created yet in the table as can
+be seen in the image below. All the files are scanned in the table to fetch
the data. Let's create a secondary index on the rider column.
+
+```sql
+SHOW INDEXES FROM hudi_indexed_table;
+SELECT * FROM hudi_indexed_table WHERE rider = 'rider-B';
+```
+
+
+<p align = "left">Figure: Query pruning without secondary index</p>
+
+### Query using Secondary Index
+
+We will run the query again after creating secondary index on rider column.
The query would now
+show the files scanned as 1 compared to 3 files scanned without index.
+
+```sql
+-- We will first create a record index since secondary index is dependent upon
it
+CREATE INDEX record_index ON hudi_indexed_table (uuid);
+-- We create a secondary index on rider column
+CREATE INDEX idx_rider ON hudi_indexed_table (rider);
+-- We run the same query again
+SELECT * FROM hudi_indexed_table WHERE rider = 'rider-B';
+DROP INDEX record_index on hudi_indexed_table;
+DROP INDEX secondary_index_idx_rider on hudi_indexed_table;
+```
+
+
+<p align = "left">Figure: Query pruning with secondary index</p>
+
+### Query using Bloom Filter Expression Index
+
+With the query run below, we will see no data skipping or pruning since there
is no index created yet on the `driver` column.
+All the files are scanned in the table to fetch the data.
+
+```sql
+SHOW INDEXES FROM hudi_indexed_table;
+SELECT * FROM hudi_indexed_table WHERE driver = 'driver-N';
+```
+
+
+<p align = "left">Figure: Query pruning without bloom filter expression
index</p>
+
+We will run the query again after creating bloom filter expression index on
rider column. The query would now
+show the files scanned as 1 compared to 3 files scanned without index.
+
+```sql
+-- We create a bloom filter expression index on driver column
+CREATE INDEX idx_bloom_driver ON hudi_indexed_table USING
bloom_filters(driver) OPTIONS(expr='identity');
+-- We run the same query again
+SELECT * FROM hudi_indexed_table WHERE driver = 'driver-N';
+DROP INDEX expr_index_idx_bloom_driver on hudi_indexed_table;
+```
+
+
+<p align = "left">Figure: Query pruning with bloom filter expression index</p>
+
+### Query using Column Stats Expression Index
+
+With the query run below, we will see no data skipping or pruning since there
is no index created yet in the table as can
+be seen in the image below. All the files are scanned in the table to fetch
the data.
+
+```sql
+SHOW INDEXES FROM hudi_indexed_table;
+SELECT uuid, rider FROM hudi_indexed_table WHERE from_unixtime(ts,
'yyyy-MM-dd') = '2023-10-17';
+```
+
+
+<p align = "left">Figure: Query pruning without column stat expression
index</p>
+
+We will run the query again after creating column stat expression index on ts
column. The query would now
+show the files scanned as 1 compared to 3 files scanned without index.
+
+```sql
+-- We create a column stat expression index on ts column
+CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts)
OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd');
+-- We run the same query again
+SELECT uuid, rider FROM hudi_indexed_table WHERE from_unixtime(ts,
'yyyy-MM-dd') = '2023-10-17';
+DROP INDEX expr_index_idx_column_ts on hudi_indexed_table;
+```
+
+
+<p align = "left">Figure: Query pruning with column stat expression index</p>
+
+### Query using Partition Stats Index
+
+With the query run below, we will see no data skipping or pruning since there
is no partition stats index created yet in the table as can
+be seen in the image below. All the partitions are scanned in the table to
fetch the data.
+
+```sql
+SHOW INDEXES FROM hudi_indexed_table;
+SELECT * FROM hudi_indexed_table WHERE rider >= 'rider-H';
+```
+
+
+<p align = "left">Figure: Query pruning without partition stats index</p>
+
+We will run the query again after creating partition stats index. The query
would now show the partitions scanned as 1
+compared to 3 partitions scanned without index.
+
+```sql
+-- We will need to enable column stats as well since partition stats index
leverages it
+SET hoodie.metadata.index.partition.stats.enable=true;
+SET hoodie.metadata.index.column.stats.enable=true;
+INSERT INTO hudi_indexed_table
+VALUES
+(1695159649,'854g46e0-8355-45cc-97c6-c31daf0df330','rider-H','driver-T',19.10,'chennai');
+-- Run the query again on the table with partition stats index
+SELECT * FROM hudi_indexed_table WHERE rider >= 'rider-H';
+DROP INDEX column_stats on hudi_indexed_table;
+DROP INDEX partition_stats on hudi_indexed_table;
+```
+
+
+<p align = "left">Figure: Query pruning with partition stats index</p>
+
+### Snapshot Query with Event Time Ordering
+
+Hudi supports different [record merge modes](/docs/next/record_merger) for
merging the records from the same key. Event
+time ordering is one of the merge modes where the records are merged based on
the event time. Let's create a table with
+event time ordering merge mode.
+
+```sql
+CREATE TABLE IF NOT EXISTS hudi_table_merge_mode (
+ id INT,
+ name STRING,
+ ts LONG,
+ price DOUBLE
+) USING hudi
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ precombineField = 'ts',
+ recordMergeMode = 'EVENT_TIME_ORDERING'
+)
+LOCATION 'file:///tmp/hudi_table_merge_mode/';
+
+-- insert a record
+INSERT INTO hudi_table_merge_mode VALUES (1, 'a1', 1000, 10.0);
+
+-- another record with the same key but lower ts
+INSERT INTO hudi_table_merge_mode VALUES (1, 'a1', 900, 20.0);
+
+-- query the table, result should be id=1, name=a1, ts=1000, price=10.0
+SELECT id, name, ts, price FROM hudi_table_merge_mode;
+```
+
+With `EVENT_TIME_ORDERING`, the record with the larger event time
(`precombineField`) overwrites the record with the
+smaller event time on the same key, regardless of transaction time.
+
+### Snapshot Query with Custom Merge Mode
+
+Users can set `CUSTOM` mode to provide their own merge logic. With `CUSTOM`
merge mode, you also need to provide your
+payload class that implements the merge logic. For example, you can use
`PartialUpdateAvroPayload` to merge the records
+as below.
+
+```sql
+CREATE TABLE IF NOT EXISTS hudi_table_merge_mode_custom (
+ id INT,
+ name STRING,
+ ts LONG,
+ price DOUBLE
+) USING hudi
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ precombineField = 'ts',
+ recordMergeMode = 'CUSTOM',
+ 'hoodie.datasource.write.payload.class' =
'org.apache.hudi.common.model.PartialUpdateAvroPayload'
+)
+LOCATION 'file:///tmp/hudi_table_merge_mode_custom/';
+
+-- insert a record
+INSERT INTO hudi_table_merge_mode_custom VALUES (1, 'a1', 1000, 10.0);
+
+-- another record with the same key but set higher ts and name as null to show
partial update
+INSERT INTO hudi_table_merge_mode_custom VALUES (1, null, 2000, 20.0);
+
+-- query the table, result should be id=1, name=a1, ts=2000, price=20.0
+SELECT id, name, ts, price FROM hudi_table_merge_mode_custom;
+```
+
+As you can see, not only the record with higher ordering field overwrites the
record with lower ordering value, but also
+the name field is partially updated.
+
### Time Travel Query
You can also query the table at a specific commit time using the `AS OF`
syntax. This is useful for debugging and auditing purposes, as well as for
@@ -66,6 +285,14 @@ FROM hudi_table_changes(
)
```
+# add note on checkpoint translation from 0.x to 1.x. same for incremental
query below
+:::note CDC Query Checkpointing between Hudi 0.x and 1.x
+In Hudi 1.0, we switch the incremental and CDC queries to used completion
time, instead of requested instant time, to determine the
+range of commits to incrementally pull from. The checkpoint stored for Hudi
incremental source and related sources is
+also changed to use completion time. To seamless migration without downtime or
data duplication, Hudi does an automatic checkpoint
+translation from requested instant time to completion time depending on the
source table version.
+:::
+
### Incremental Query
Incremental queries are useful when you want to obtain the latest values for
all records that have changed after a given commit time. They help author
incremental data pipelines with
@@ -92,6 +319,85 @@ see all changes in a given time window and not just the
latest values.
Please refer to [configurations](/docs/basic_configurations) section for the
important configuration options.
+:::note Incremental Query Checkpointing between Hudi 0.x and 1.0.
+In Hudi 1.0, we switch the incremental and CDC query to used completion time,
instead of instant time, to determine the
+range of commits to incrementally pull from. The checkpoint stored for Hudi
incremental source and related sources is
+also changed to use completion time. To support compatiblity, Hudi does a
checkpoint translation from requested instant
+time to completion time depending on the source table version.
+:::
+
+### Query Indexes and Timeline
+
+Hudi also allows users to directly query the metadata partitions and check the
metadata corresponding to the table
+and the various indexes. In this section we will check the various queries
which can be used for this purpose.
+
+Let's first create a table with various indexes created.
+```sql
+-- Create a table with primary key
+CREATE TABLE hudi_indexed_table (
+ ts BIGINT,
+ uuid STRING,
+ rider STRING,
+ driver STRING,
+ fare DOUBLE,
+ city STRING
+) USING HUDI
+options(
+ primaryKey ='uuid',
+ hoodie.write.record.merge.mode = "COMMIT_TIME_ORDERING"
+)
+PARTITIONED BY (city);
+
+-- Create partition stat index
+SET hoodie.metadata.index.partition.stats.enable=true;
+SET hoodie.metadata.index.column.stats.enable=true;
+
+INSERT INTO hudi_indexed_table
+VALUES
+(1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
+(1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70
,'san_francisco'),
+(1695046462,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90
,'san_francisco'),
+(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
+(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'
),
+(1695376420,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40
,'sao_paulo' ),
+(1695173887,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06
,'chennai' ),
+(1695115999,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
+
+-- Create column stat expression index on ts column
+CREATE INDEX idx_column_ts ON hudi_indexed_table USING column_stats(ts)
OPTIONS(expr='from_unixtime', format = 'yyyy-MM-dd');
+-- Create secondary index on rider column
+CREATE INDEX record_index ON hudi_indexed_table (uuid);
+CREATE INDEX idx_rider ON hudi_indexed_table (rider);
+SET hoodie.metadata.record.index.enable=true;
+```
+
+```sql
+-- Query the secondary keys stores in the secondary index partition
+SELECT key FROM hudi_metadata('hudi_indexed_table') WHERE type=7;
+
+-- Query the column stat records stored in the column stat indexes or column
stat expression index
+select ColumnStatsMetadata.columnName, ColumnStatsMetadata.minValue,
ColumnStatsMetadata.maxValue from hudi_metadata('hudi_indexed_table') where
type=3;
+-- Query can be further refined to get nested fields and exact values for a
particular partition.
+-- Below query fetches the column stats metadata for column stat expression
index on ts column.
+select ColumnStatsMetadata.columnName,
ColumnStatsMetadata.minValue.member6.value,
ColumnStatsMetadata.maxValue.member6.value from
hudi_metadata('hudi_indexed_table') where type=3 AND
ColumnStatsMetadata.columnName='ts';
+
+-- Query the partition stat index records for rider column. Partition stat
index records use the same schema as column stat index records
+select ColumnStatsMetadata.columnName,
ColumnStatsMetadata.minValue.member6.value,
ColumnStatsMetadata.maxValue.member6.value from
hudi_metadata('hudi_indexed_table') where type=6 AND
ColumnStatsMetadata.columnName='rider';
+```
+
+All the different index types can be queries by specifying the type column for
that index. Here are the metadata partitions
+and the corresponding type column value.
+
+| MDT Partition | Type Column Value |
+|:------------------:|:-------------------:|
+| Files | 2 |
+| Column Stat | 3 |
+| Bloom Filters | 4 |
+| Record Index | 5 |
+| Secondary Index | 7 |
+| Partition Stats | 6 |
+
+
## Flink SQL
Once the Flink Hudi tables have been registered to the Flink catalog, they can
be queried using the Flink SQL. It supports all query types across both Hudi
table types,
relying on the custom Hudi input formats like Hive. Typically, notebook users
and Flink SQL CLI users leverage flink sql for querying Hudi tables. Please add
hudi-flink-bundle as described in the [Flink
Quickstart](/docs/flink-quick-start-guide).
diff --git
a/website/static/assets/images/bloom-filter-expression-index-with-pruning.png
b/website/static/assets/images/bloom-filter-expression-index-with-pruning.png
new file mode 100644
index 00000000000..ce36b138519
Binary files /dev/null and
b/website/static/assets/images/bloom-filter-expression-index-with-pruning.png
differ
diff --git
a/website/static/assets/images/bloom-filter-expression-index-without-pruning.png
b/website/static/assets/images/bloom-filter-expression-index-without-pruning.png
new file mode 100644
index 00000000000..70aac41f2bb
Binary files /dev/null and
b/website/static/assets/images/bloom-filter-expression-index-without-pruning.png
differ
diff --git
a/website/static/assets/images/column-stat-expression-index-with-pruning.png
b/website/static/assets/images/column-stat-expression-index-with-pruning.png
new file mode 100644
index 00000000000..c665c0d3183
Binary files /dev/null and
b/website/static/assets/images/column-stat-expression-index-with-pruning.png
differ
diff --git
a/website/static/assets/images/column-stat-expression-index-without-pruning.png
b/website/static/assets/images/column-stat-expression-index-without-pruning.png
new file mode 100644
index 00000000000..d1d4e7f6aa3
Binary files /dev/null and
b/website/static/assets/images/column-stat-expression-index-without-pruning.png
differ
diff --git a/website/static/assets/images/partition-stat-index-with-pruning.png
b/website/static/assets/images/partition-stat-index-with-pruning.png
new file mode 100644
index 00000000000..2d7d7cb8790
Binary files /dev/null and
b/website/static/assets/images/partition-stat-index-with-pruning.png differ
diff --git
a/website/static/assets/images/partition-stat-index-without-pruning.png
b/website/static/assets/images/partition-stat-index-without-pruning.png
new file mode 100644
index 00000000000..22cfbcb8802
Binary files /dev/null and
b/website/static/assets/images/partition-stat-index-without-pruning.png differ
diff --git a/website/static/assets/images/secondary-index-with-pruning.png
b/website/static/assets/images/secondary-index-with-pruning.png
new file mode 100644
index 00000000000..111b9726ef7
Binary files /dev/null and
b/website/static/assets/images/secondary-index-with-pruning.png differ
diff --git a/website/static/assets/images/secondary-index-without-pruning.png
b/website/static/assets/images/secondary-index-without-pruning.png
new file mode 100644
index 00000000000..755d07368f2
Binary files /dev/null and
b/website/static/assets/images/secondary-index-without-pruning.png differ