nsivabalan commented on code in PR #9712:
URL: https://github.com/apache/hudi/pull/9712#discussion_r1326568180
##########
website/docs/quick-start-guide.md:
##########
@@ -1538,174 +998,205 @@ values={[
<TabItem value="scala">
```scala
-// spark-shell
-spark.
- read.format("hudi").
- load(basePath).
- select("uuid","partitionpath").
- sort("partitionpath","uuid").
- show(100, false)
+spark.read.
+ format("hudi").
+ option("as.of.instant", "20210728141108100").
+ load(basePath)
-val inserts = convertToStringList(dataGen.generateInserts(10))
-val df = spark.
- read.json(spark.sparkContext.parallelize(inserts, 2)).
- filter("partitionpath = 'americas/united_states/san_francisco'")
-df.write.format("hudi").
- options(getQuickstartWriteConfigs).
- option(OPERATION.key(),"insert_overwrite").
- option(PRECOMBINE_FIELD.key(), "ts").
- option(RECORDKEY_FIELD.key(), "uuid").
- option(PARTITIONPATH_FIELD.key(), "partitionpath").
- option(TBL_NAME.key(), tableName).
- mode(Append).
- save(basePath)
+spark.read.
+ format("hudi").
+ option("as.of.instant", "2021-07-28 14:11:08.200").
+ load(basePath)
+
+// It is equal to "as.of.instant = 2021-07-28 00:00:00"
+spark.read.
+ format("hudi").
+ option("as.of.instant", "2021-07-28").
+ load(basePath)
-// Should have different keys now for San Francisco alone, from query before.
-spark.
- read.format("hudi").
- load(basePath).
- select("uuid","partitionpath").
- sort("partitionpath","uuid").
- show(100, false)
```
-</TabItem>
+</TabItem>
<TabItem value="python">
```python
-# pyspark
-spark.read.format("hudi"). \
- load(basePath). \
- select(["uuid", "partitionpath"]). \
- sort(["partitionpath", "uuid"]). \
- show(n=100, truncate=False)
-
-inserts =
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
-df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)). \
- filter("partitionpath = 'americas/united_states/san_francisco'")
-hudi_insert_overwrite_options = {
- 'hoodie.table.name': tableName,
- 'hoodie.datasource.write.recordkey.field': 'uuid',
- 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': tableName,
- 'hoodie.datasource.write.operation': 'insert_overwrite',
- 'hoodie.datasource.write.precombine.field': 'ts',
- 'hoodie.upsert.shuffle.parallelism': 2,
- 'hoodie.insert.shuffle.parallelism': 2
-}
-df.write.format("hudi").options(**hudi_insert_overwrite_options).mode("append").save(basePath)
-spark.read.format("hudi"). \
- load(basePath). \
- select(["uuid", "partitionpath"]). \
- sort(["partitionpath", "uuid"]). \
- show(n=100, truncate=False)
+#pyspark
+spark.read. \
+ format("hudi"). \
+ option("as.of.instant", "20210728141108"). \
+ load(basePath)
+
+spark.read. \
+ format("hudi"). \
+ option("as.of.instant", "2021-07-28 14:11:08.000"). \
+ load(basePath)
+
+# It is equal to "as.of.instant = 2021-07-28 00:00:00"
+spark.read. \
+ format("hudi"). \
+ option("as.of.instant", "2021-07-28"). \
+ load(basePath)
```
-</TabItem>
+</TabItem>
<TabItem value="sparksql">
-`insert overwrite` a partitioned table use the `INSERT_OVERWRITE` type of
write operation, while a non-partitioned table to `INSERT_OVERWRITE_TABLE`.
+:::note
+Requires Spark 3.2+
+:::
```sql
--- insert overwrite non-partitioned table
-insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
-insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;
--- insert overwrite partitioned table with dynamic partition
-insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09',
'10';
-
--- insert overwrite partitioned table with static partition
-insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select
13, 'a13', 1100;
+-- time travel based on commit time, for eg: `20220307091628793`
+select * from hudi_cow_pt_tbl timestamp as of '20220307091628793' where id = 1;
+-- time travel based on different timestamp formats
+select * from hudi_cow_pt_tbl timestamp as of '2022-03-07 09:16:28.100' where
id = 1;
+select * from hudi_cow_pt_tbl timestamp as of '2022-03-08' where id = 1;
```
+
</TabItem>
</Tabs
>
-## More Spark SQL Commands
-### Alter Table
+### Incremental query
-Schema evolution can be achieved via `ALTER TABLE` commands. Below shows some
basic examples.
+Hudi also provides capability to obtain a stream of records that changed since
given commit timestamp.
+This can be achieved using Hudi's incremental querying and providing a begin
time from which changes need to be streamed.
+We do not need to specify endTime, if we want all changes after the given
commit (as is the common case).
-:::note
-For more detailed examples, please prefer to [schema
evolution](schema_evolution)
-:::
+<Tabs
+groupId="programming-language"
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+{ label: 'Spark SQL', value: 'sparksql', }
+]}
+>
-**Syntax**
-```sql
--- Alter table name
-ALTER TABLE oldTableName RENAME TO newTableName
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
--- Alter table add columns
-ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
+val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime
from hudi_trips_snapshot order by commitTime").map(k =>
k.getString(0)).take(50)
+val beginTime = commits(commits.length - 2) // commit time we are interested in
--- Alter table column type
-ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
+// incrementally query data
+val tripsIncrementalDF = spark.read.format("hudi").
+ option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
+ option(BEGIN_INSTANTTIME.key(), beginTime).
+ load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
--- Alter table properties
-ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
```
-**Examples**
-```sql
---rename to:
-ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
---add column:
-ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+spark. \
+ read. \
+ format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select
distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by
commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+ 'hoodie.datasource.query.type': 'incremental',
+ 'hoodie.datasource.read.begin.instanttime': beginTime,
+}
---change column:
-ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
+tripsIncrementalDF = spark.read.format("hudi"). \
+ options(**incremental_read_options). \
+ load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
---set properties;
-alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits =
'10');
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
```
-### Partition SQL Command
+</TabItem>
-**Syntax**
+<TabItem value="sparksql">
```sql
--- Add Partition
-ALTER TABLE tableIdentifier ADD PARTITION ( partition_col_name =
partition_col_val [ , ... ] )
+-- syntax
+hudi_table_changes(table or path, queryType, beginTime [, endTime]);
+-- table or path: table identifier, example: db.tableName, tableName,
+-- or path for of your table, example: path/to/hudiTable
+-- in this case table does not need to exist in the metastore,
+-- queryType: incremental query mode, example: latest_state, cdc
+-- (for cdc query, first enable cdc for your table by setting
cdc.enabled=true),
+-- beginTime: instantTime to begin query from, example: earliest,
202305150000,
+-- endTime: optional instantTime to end query at, example: 202305160000,
--- Drop Partition
-ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name =
partition_col_val [ , ... ] )
+-- incrementally query data by table name
+-- start from earliest available commit, end at latest available commit.
+select * from hudi_table_changes('db.table', 'latest_state', 'earliest');
--- Show Partitions
-SHOW PARTITIONS tableIdentifier
-```
-**Examples**
-```sql
---show partition:
-show partitions hudi_cow_pt_tbl;
+-- start from earliest, end at 202305160000.
+select * from hudi_table_changes('table', 'latest_state', 'earliest',
'202305160000');
+
+-- start from 202305150000, end at 202305160000.
+select * from hudi_table_changes('table', 'latest_state', '202305150000',
'202305160000');
+
+-- incrementally query data by path
+-- start from earliest available commit, end at latest available commit.
+select * from hudi_table_changes('path/to/table', 'cdc', 'earliest');
+
+-- start from earliest, end at 202305160000.
+select * from hudi_table_changes('path/to/table', 'cdc', 'earliest',
'202305160000');
+
+-- start from 202305150000, end at 202305160000.
+select * from hudi_table_changes('path/to/table', 'cdc', '202305150000',
'202305160000');
---drop partition:
-alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10');
```
-:::note
-Currently, the result of `show partitions` is based on the filesystem table
path. It's not precise when delete the whole partition data or drop certain
partition directly.
+</TabItem>
+
+</Tabs
+>
+
+:::info
+This will give all changes that happened after the beginTime commit with the
filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
:::
-### Procedures
+## Streaming writers
+### Hudi Streamer
+Hudi provides a ingestion tool to assist with ingesting data into hudi from
various difference sources in a streaming manner.
+This has lot of niceties like auto checkpointing, schema enforcement via
schema provider, transformation support and so on.
+Please refer to [here](/docs/next/hoodie_deltastreamer#hudi-streamer) for more
info.
-**Syntax**
-```sql
---Call procedure by positional arguments
-CALL system.procedure_name(arg_1, arg_2, ... arg_n)
+### Structured Streaming
---Call procedure by named arguments
-CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1, ...
arg_name_n => arg_n)
-```
-**Examples**
-```sql
---show commit's info
-call show_commits(table => 'test_hudi_table', limit => 10);
-```
+Hudi supports Spark Structured Streaming reads and writes as well. Please
refer to [here](/docs/next/toBeFixed) for more info.
Review Comment:
yes, once we land this https://github.com/apache/hudi/pull/9701, will update
it here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]