This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 4b59d41 [HUDI-2878] enhance hudi-quick-start guide for spark-sql
(#4269)
4b59d41 is described below
commit 4b59d41e0762ad561f7fa1272859d48a0966dbec
Author: Yann Byron <[email protected]>
AuthorDate: Fri Dec 10 17:05:46 2021 +0800
[HUDI-2878] enhance hudi-quick-start guide for spark-sql (#4269)
---
website/docs/quick-start-guide.md | 417 +++++++++++++++++++++++++++++++-------
1 file changed, 342 insertions(+), 75 deletions(-)
diff --git a/website/docs/quick-start-guide.md
b/website/docs/quick-start-guide.md
index 4abd431..1aa2801 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -31,17 +31,17 @@ From the extracted directory run spark-shell with Hudi as:
```scala
// spark-shell for spark 3
spark-shell \
- --packages
org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1
\
+ --packages
org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:3.1.2
\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
// spark-shell for spark 2 with scala 2.12
spark-shell \
- --packages
org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.4
\
+ --packages
org.apache.hudi:hudi-spark-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:2.4.4
\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
// spark-shell for spark 2 with scala 2.11
spark-shell \
- --packages
org.apache.hudi:hudi-spark-bundle_2.11:0.9.0,org.apache.spark:spark-avro_2.11:2.4.4
\
+ --packages
org.apache.hudi:hudi-spark-bundle_2.11:0.10.0,org.apache.spark:spark-avro_2.11:2.4.4
\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
```
@@ -53,18 +53,18 @@ From the extracted directory run spark-sql with Hudi as:
```shell
# spark sql for spark 3
-spark-sql --packages
org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1
\
+spark-sql --packages
org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:3.1.2
\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# spark-sql for spark 2 with scala 2.11
-spark-sql --packages
org.apache.hudi:hudi-spark-bundle_2.11:0.9.0,org.apache.spark:spark-avro_2.11:2.4.4
\
+spark-sql --packages
org.apache.hudi:hudi-spark-bundle_2.11:0.10.0,org.apache.spark:spark-avro_2.11:2.4.4
\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# spark-sql for spark 2 with scala 2.12
spark-sql \
- --packages
org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.4
\
+ --packages
org.apache.hudi:hudi-spark-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:2.4.4
\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
```
@@ -80,17 +80,17 @@ export PYSPARK_PYTHON=$(which python3)
# for spark3
pyspark
---packages
org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:3.0.1
+--packages
org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:3.1.2
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
# for spark2 with scala 2.12
pyspark
---packages
org.apache.hudi:hudi-spark-bundle_2.12:0.9.0,org.apache.spark:spark-avro_2.12:2.4.4
+--packages
org.apache.hudi:hudi-spark-bundle_2.12:0.10.0,org.apache.spark:spark-avro_2.12:2.4.4
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
# for spark2 with scala 2.11
pyspark
---packages
org.apache.hudi:hudi-spark-bundle_2.11:0.9.0,org.apache.spark:spark-avro_2.11:2.4.4
+--packages
org.apache.hudi:hudi-spark-bundle_2.11:0.10.0,org.apache.spark:spark-avro_2.11:2.4.4
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
```
@@ -100,7 +100,7 @@ pyspark
:::note Please note the following
<ul>
<li>spark-avro module needs to be specified in --packages as it is not
included with spark-shell by default</li>
- <li>spark-avro and spark versions must match (we have used 3.0.1 for both
above)</li>
+ <li>spark-avro and spark versions must match (we have used 3.1.2 for both
above)</li>
<li>we have used hudi-spark-bundle built for scala 2.12 since the spark-avro
module used also depends on 2.12.
If spark-avro_2.11 is used, correspondingly hudi-spark-bundle_2.11
needs to be used. </li>
</ul>
@@ -175,18 +175,161 @@ values={[
</TabItem>
<TabItem value="sparksql">
+Spark SQL needs an explicit create table command.
+
+**Table Concepts**
+- Table types:
+ Both of Hudi's table types (Copy-On-Write (COW) and Merge-On-Read (MOR)) can
be created using Spark SQL.
+
+ While creating the table, table type can be specified using **type** option.
**type = 'cow'** represents COW table, while **type = 'mor'** represents MOR
table.
+
+- Partitioned & Non-Partitioned table:
+ Users can create a partitioned table or a non-partitioned table in Spark SQL.
+ To create a partitioned table, one needs to use **partitioned by** statement
to specify the partition columns to create a partitioned table.
+ When there is no **partitioned by** statement with create table command,
table is considered to be a non-partitioned table.
+
+- Managed & External table:
+ In general, Spark SQL supports two kinds of tables, namely managed and
external.
+ If one specifies a location using **location** statement or use `create
external table` to create table explicitly, it is an external table, else its
considered a managed table.
+ You can read more about external vs managed tables
[here](https://sparkbyexamples.com/apache-hive/difference-between-hive-internal-tables-and-external-tables/).
+
+:::note
+1. Since hudi 0.10.0, `primaryKey` is required to specify. It can align with
Hudi datasource writer’s and resolve many behavioural discrepancies reported in
previous versions.
+ Non-primaryKey tables are no longer supported. Any hudi table created pre
0.10.0 without a `primaryKey` needs to be recreated with a `primaryKey` field
with 0.10.0.
+ Same as `hoodie.datasource.write.recordkey.field`, hudi use `uuid` as the
default primaryKey. So if you want to use `uuid` as your table's `primaryKey`,
you can omit the `primaryKey` config in `tblproperties`.
+2. `primaryKey`, `preCombineField`, `type` is case sensitive.
+3. To specify `primaryKey`, `preCombineField`, `type` or other hudi configs,
`tblproperties` is the preferred way than `options`. Spark SQL syntax is
detailed here.
+4. A new hudi table created by Spark SQL will set
`hoodie.table.keygenerator.class` as
`org.apache.hudi.keygen.ComplexKeyGenerator`, and
+`hoodie.datasource.write.hive_style_partitioning` as `true` by default.
+:::
+
+Let's go over some of the create table commands.
+
+**Create a Non-Partitioned Table**
+
```sql
---
-create table if not exists hudi_table2(
- id int,
- name string,
+-- create a cow table, with default primaryKey 'uuid' and without
preCombineField provided
+create table hudi_cow_nonpcf_tbl (
+ uuid int,
+ name string,
price double
+) using hudi;
+
+
+-- create a mor non-partitioned table without preCombineField provided
+create table hudi_mor_tbl (
+ id int,
+ name string,
+ price double,
+ ts bigint
) using hudi
-options (
- type = 'cow'
+tblproperties (
+ type = 'cow',
+ primaryKey = 'id',
+ preCombineField = 'ts'
);
```
+Here is an example of creating an external COW partitioned table.
+
+**Create Partitioned Table**
+
+```sql
+-- create a partitioned, preCombineField-provided cow table
+create table hudi_cow_pt_tbl (
+ id bigint,
+ name string,
+ ts bigint,
+ dt string,
+ hh string
+) using hudi
+tblproperties (
+ type = 'cow',
+ primaryKey = 'id',
+ preCombineField = 'ts'
+ )
+partitioned by (dt, hh)
+location '/tmp/hudi/hudi_cow_pt_tbl';
+```
+
+**Create Table for an existing Hudi Table**
+
+We can create a table on an existing hudi table(created with spark-shell or
deltastreamer). This is useful to
+read/write to/from a pre-existing hudi table.
+
+```sql
+-- create an external hudi table based on an existing path
+
+-- for non-partitioned table
+create table hudi_existing_tbl0 using hudi
+location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';
+
+-- for partitioned table
+create table hudi_existing_tbl1 using hudi
+partitioned by (dt, hh)
+location 'file:///tmp/hudi/dataframe_hudi_pt_table';
+```
+
+:::tip
+You don't need to specify schema and any properties except the partitioned
columns if existed. Hudi can automatically recognize the schema and
configurations.
+:::
+
+**CTAS**
+
+Hudi supports CTAS (Create Table As Select) on spark sql. <br/>
+Note: For better performance to load data to hudi table, CTAS uses the **bulk
insert** as the write operation.
+
+Example CTAS command to create a non-partitioned COW table without
preCombineField.
+
+```sql
+-- CTAS: create a non-partitioned cow table without preCombineField
+create table hudi_ctas_cow_nonpcf_tbl
+using hudi
+tblproperties (primaryKey = 'id')
+as
+select 1 as id, 'a1' as name, 10 as price;
+```
+
+Example CTAS command to create a partitioned, primary key COW table.
+
+```sql
+-- CTAS: create a partitioned, preCombineField-provided cow table
+create table hudi_ctas_cow_pt_tbl
+using hudi
+tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
+partitioned by (dt)
+as
+select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
+
+```
+
+Example CTAS command to load data from another table.
+
+```sql
+# create managed parquet table
+create table parquet_mngd using parquet location
'file:///tmp/parquet_dataset/*.parquet';
+
+# CTAS by loading data into hudi table
+create table hudi_ctas_cow_pt_tbl2 using hudi location
'file:/tmp/hudi/hudi_tbl/' options (
+ type = 'cow',
+ primaryKey = 'id',
+ preCombineField = 'ts'
+ )
+partitioned by (datestr) as select * from parquet_mngd;
+```
+
+**Create Table Properties**
+
+Users can set table properties while creating a hudi table. Critical options
are listed here.
+
+| Parameter Name | Default | Introduction |
+|------------------|--------|------------|
+| primaryKey | uuid | The primary key names of the table, multiple fields
separated by commas. Same as `hoodie.datasource.write.recordkey.field` |
+| preCombineField | | The pre-combine field of the table. Same as
`hoodie.datasource.write.precombine.field` |
+| type | cow | The table type to create. type = 'cow' means a
COPY-ON-WRITE table, while type = 'mor' means a MERGE-ON-READ table. Same as
`hoodie.datasource.write.table.type` |
+
+To set any custom hudi config(like index type, max parquet size, etc), see the
"Set hudi config section" .
+
</TabItem>
</Tabs>
@@ -268,26 +411,37 @@ Here we are using the default write operation : `upsert`.
If you have a workload
<TabItem value="sparksql">
```sql
-insert into hudi_table2 select 1, 'a1', 20;
+-- insert into non-partitioned table
+insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
+insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
--- insert static partition
-insert into hudi_table2 partition(dt = '2021-01-02') select 1, 'a1';
+-- insert dynamic partition
+insert into hudi_cow_pt_tbl partition (dt, hh)
+select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
--- insert overwrite table
-insert overwrite table h0 select 1, 'a1', 20;
+-- insert static partition
+insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2,
'a2', 1000;
```
**NOTICE**
+- By default, if `preCombineKey ` is provided, `insert into` use `upsert`
as the type of write operation, otherwise use `insert`.
+- We support to use `bulk_insert` as the type of write operation, just need to
set two configs: `hoodie.sql.bulk.insert.enable` and `hoodie.sql.insert.mode`.
Example as follow:
-- Insert mode : Hudi supports two insert modes when inserting data to a table
with primary key(we call it pk-table as followed):<br/>
- Using `strict` mode, insert statement will keep the primary key uniqueness
constraint for COW table which do not allow
- duplicate records. If a record already exists during insert, a
HoodieDuplicateKeyException will be thrown
- for COW table. For MOR table, updates are allowed to existing record.<br/>
- Using `non-strict` mode, hudi uses the same code path used by `insert`
operation in spark data source for the pk-table. <br/>
- One can set the insert mode by using the config: **hoodie.sql.insert.mode**
-
-- Bulk Insert : By default, hudi uses the normal insert operation for insert
statements. Users can set **hoodie.sql.bulk.insert.enable**
- to true to enable the bulk insert for insert statement.
+```sql
+-- upsert mode for preCombineField-provided table
+insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
+select id, name, price, ts from hudi_mor_tbl;
+1 a1_1 20.0 1001
+
+-- bulk_insert mode for preCombineField-provided table
+set hoodie.sql.bulk.insert.enable=true;
+set hoodie.sql.insert.mode=non-strict;
+
+insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
+select id, name, price, ts from hudi_mor_tbl;
+1 a1_1 20.0 1001
+1 a1_2 20.0 1002
+```
</TabItem>
</Tabs>
@@ -447,6 +601,21 @@ denoted by the timestamp. Look for changes in
`_hoodie_commit_time`, `rider`, `d
Spark sql supports two kinds of DML to update hudi table: Merge-Into and
Update.
+### Update
+**Syntax**
+```sql
+UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE
boolExpression]
+```
+**Case**
+```sql
+update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
+
+update hudi_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;
+```
+:::note
+`Update` operation requires `preCombineField` specified.
+:::
+
### MergeInto
**Syntax**
@@ -470,57 +639,34 @@ ON <merge_condition>
```
**Example**
```sql
-merge into h0 as target
-using (
- select id, name, price, flag from s
-) source
+-- source table using hudi for testing merging into non-partitioned table
+create table merge_source (id int, name string, price double, ts bigint) using
hudi
+tblproperties (primaryKey = 'id', preCombineField = 'ts');
+insert into merge_source values (1, "old_a1", 22.22, 900), (2, "new_a2",
33.33, 2000), (3, "new_a3", 44.44, 2000);
+
+merge into hudi_mor_tbl as target
+using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;
-merge into h0
+-- source table using parquet for testing merging into partitioned table
+create table merge_source2 (id int, name string, flag string, dt string, hh
string) using parquet;
+insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'),
(2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert',
'2021-12-09', '12');
+
+merge into hudi_cow_pt_tbl as target
using (
- select id, name, price, flag from s
+ select id, name, '1000' as ts, flag, dt, hh from merge_source2
) source
-on h0.id = source.id
-when matched and flag != 'delete' then update set id = source.id, name =
source.name, price = source.price * 2
+on target.id = source.id
+when matched and flag != 'delete' then
+ update set id = source.id, name = source.name, ts = source.ts, dt =
source.dt, hh = source.hh
when matched and flag = 'delete' then delete
-when not matched then insert (id,name,price) values(id, name, price)
+when not matched then
+ insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts,
source.dt, source.hh)
;
-```
-**Notice**
-
-- The merge-on condition can be only on primary keys. Support to merge based
on other fields will be added in future.
-- Support for partial updates is supported for cow table.
-e.g.
-```sql
- merge into h0 using s0
- on h0.id = s0.id
- when matched then update set price = s0.price * 2
-```
-This works well for Cow-On-Write table which supports update based on the
**price** field.
-For Merge-on-Read table this will be supported in the future.
-- Target table's fields cannot be the right-value of the update expression for
Merge-On-Read table.
-e.g.
-```sql
- merge into h0 using s0
- on h0.id = s0.id
- when matched then update set id = s0.id,
- name = h0.name,
- price = s0.price + h0.price
-```
-This works well for Cow-On-Write table, but not yet supported for
Merge-On-Read table.
-### Update
-**Syntax**
-```sql
- UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE
boolExpression]
-```
-**Case**
-```sql
- update h0 set price = price + 20 where id = 1;
- update h0 set price = price *2, name = 'a2' where id = 2;
```
</TabItem>
@@ -579,7 +725,7 @@ val tripsIncrementalDF = spark.read.format("hudi").
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
-```
+```
</TabItem>
<TabItem value="python">
@@ -723,11 +869,13 @@ Only `Append` mode is supported for delete operation.
**Syntax**
```sql
- DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
+DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
```
**Example**
```sql
-delete from h0 where id = 1;
+delete from hudi_cow_nonpcf_tbl where uuid = 1;
+
+delete from hudi_mor_tbl where id % 2 = 0;
```
</TabItem>
@@ -779,12 +927,131 @@ Only `Append` mode is supported for delete operation.
See the [deletion section](/docs/writing_data#deletes) of the writing data
page for more details.
+## Insert Overwrite
+
+Generate some new trips, overwrite the all the partitions that are present in
the input. This operation can be faster
+than `upsert` for batch ETL jobs, that are recomputing entire target
partitions at once (as opposed to incrementally
+updating the target tables). This is because, we are able to bypass indexing,
precombining and other repartitioning
+steps in the upsert write path completely.
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'SparkSQL', value: 'sparksql', },
+]}>
+<TabItem value="scala">
+
+```scala
+// spark-shell
+spark.
+ read.format("hudi").
+ load(basePath).
+ select("uuid","partitionpath").
+ sort("partitionpath","uuid").
+ show(100, false)
+
+val inserts = convertToStringList(dataGen.generateInserts(10))
+val df = spark.
+ read.json(spark.sparkContext.parallelize(inserts, 2)).
+ filter("partitionpath = 'americas/united_states/san_francisco'")
+df.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION.key(),"insert_overwrite").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(RECORDKEY_FIELD.key(), "uuid").
+ option(PARTITIONPATH_FIELD.key(), "partitionpath").
+ option(TBL_NAME.key(), tableName).
+ mode(Append).
+ save(basePath)
+
+// Should have different keys now for San Francisco alone, from query before.
+spark.
+ read.format("hudi").
+ load(basePath).
+ select("uuid","partitionpath").
+ sort("partitionpath","uuid").
+ show(100, false)
+```
+</TabItem>
+
+<TabItem value="sparksql">
+
+`insert overwrite` a partitioned table use the `INSERT_OVERWRITE` type of
write operation, while a non-partitioned table to `INSERT_OVERWRITE_TABLE`.
+
+```sql
+-- insert overwrite non-partitioned table
+insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
+insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;
+
+-- insert overwrite partitioned table with dynamic partition
+insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09',
'10';
+
+-- insert overwrite partitioned table with static partition
+insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select
13, 'a13', 1100;
+```
+</TabItem>
+</Tabs>
+
+## More Spark SQL Commands
+
+### AlterTable
+**Syntax**
+```sql
+-- Alter table name
+ALTER TABLE oldTableName RENAME TO newTableName
+
+-- Alter table add columns
+ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
+
+-- Alter table column type
+ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
+
+-- Alter table properties
+ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
+```
+**Examples**
+```sql
+--rename to:
+ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
+
+--add column:
+ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
+
+--change column:
+ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
+
+--set properties;
+alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits =
'10');
+```
+### Partition SQL Command
+**Syntax**
+
+```sql
+-- Drop Partition
+ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name =
partition_col_val [ , ... ] )
+
+-- Show Partitions
+SHOW PARTITIONS tableIdentifier
+```
+**Examples**
+```sql
+--show partition:
+show partitions hudi_cow_pt_tbl;
+
+--drop partition:
+alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10');
+```
+:::note
+Currently, the result of `show partitions` is based on the filesystem table
path. It's not precise when delete the whole partition data or drop certain
partition directly.
+
+:::
## Where to go from here?
You can also do the quickstart by [building hudi
yourself](https://github.com/apache/hudi#building-apache-hudi-from-source),
and using `--jars <path to
hudi_code>/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.1?-*.*.*-SNAPSHOT.jar`
in the spark-shell command above
-instead of `--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0`. Hudi
also supports scala 2.12. Refer [build with scala
2.12](https://github.com/apache/hudi#build-with-scala-212)
+instead of `--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.10.0`. Hudi
also supports scala 2.12. Refer [build with scala
2.12](https://github.com/apache/hudi#build-with-scala-212)
for more info.
Also, we used Spark here to show case the capabilities of Hudi. However, Hudi
can support multiple table types/query types and