nsivabalan commented on code in PR #9712:
URL: https://github.com/apache/hudi/pull/9712#discussion_r1326568180


##########
website/docs/quick-start-guide.md:
##########
@@ -1538,174 +998,205 @@ values={[
 <TabItem value="scala">
 
 ```scala
-// spark-shell
-spark.
-  read.format("hudi").
-  load(basePath).
-  select("uuid","partitionpath").
-  sort("partitionpath","uuid").
-  show(100, false)
+spark.read.
+  format("hudi").
+  option("as.of.instant", "20210728141108100").
+  load(basePath)
 
-val inserts = convertToStringList(dataGen.generateInserts(10))
-val df = spark.
-  read.json(spark.sparkContext.parallelize(inserts, 2)).
-  filter("partitionpath = 'americas/united_states/san_francisco'")
-df.write.format("hudi").
-  options(getQuickstartWriteConfigs).
-  option(OPERATION.key(),"insert_overwrite").
-  option(PRECOMBINE_FIELD.key(), "ts").
-  option(RECORDKEY_FIELD.key(), "uuid").
-  option(PARTITIONPATH_FIELD.key(), "partitionpath").
-  option(TBL_NAME.key(), tableName).
-  mode(Append).
-  save(basePath)
+spark.read.
+  format("hudi").
+  option("as.of.instant", "2021-07-28 14:11:08.200").
+  load(basePath)
+
+// It is equal to "as.of.instant = 2021-07-28 00:00:00"
+spark.read.
+  format("hudi").
+  option("as.of.instant", "2021-07-28").
+  load(basePath)
 
-// Should have different keys now for San Francisco alone, from query before.
-spark.
-  read.format("hudi").
-  load(basePath).
-  select("uuid","partitionpath").
-  sort("partitionpath","uuid").
-  show(100, false)
 ```
-</TabItem>
 
+</TabItem>
 <TabItem value="python">
 
 ```python
-# pyspark
-spark.read.format("hudi"). \
-    load(basePath). \
-    select(["uuid", "partitionpath"]). \
-    sort(["partitionpath", "uuid"]). \
-    show(n=100, truncate=False)
-    
-inserts = 
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
 
-df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)). \
-    filter("partitionpath = 'americas/united_states/san_francisco'")
-hudi_insert_overwrite_options = {
-    'hoodie.table.name': tableName,
-    'hoodie.datasource.write.recordkey.field': 'uuid',
-    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
-    'hoodie.datasource.write.table.name': tableName,
-    'hoodie.datasource.write.operation': 'insert_overwrite',
-    'hoodie.datasource.write.precombine.field': 'ts',
-    'hoodie.upsert.shuffle.parallelism': 2,
-    'hoodie.insert.shuffle.parallelism': 2
-}
-df.write.format("hudi").options(**hudi_insert_overwrite_options).mode("append").save(basePath)
-spark.read.format("hudi"). \
-    load(basePath). \
-    select(["uuid", "partitionpath"]). \
-    sort(["partitionpath", "uuid"]). \
-    show(n=100, truncate=False)
+#pyspark
+spark.read. \
+  format("hudi"). \
+  option("as.of.instant", "20210728141108"). \
+  load(basePath)
+
+spark.read. \
+  format("hudi"). \
+  option("as.of.instant", "2021-07-28 14:11:08.000"). \
+  load(basePath)
+
+# It is equal to "as.of.instant = 2021-07-28 00:00:00"
+spark.read. \
+  format("hudi"). \
+  option("as.of.instant", "2021-07-28"). \
+  load(basePath)
 ```
-</TabItem>
 
+</TabItem>
 <TabItem value="sparksql">
 
-`insert overwrite` a partitioned table use the `INSERT_OVERWRITE` type of 
write operation, while a non-partitioned table to `INSERT_OVERWRITE_TABLE`.
+:::note
+Requires Spark 3.2+
+:::
 
 ```sql
--- insert overwrite non-partitioned table
-insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
-insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;
 
--- insert overwrite partitioned table with dynamic partition
-insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', 
'10';
-
--- insert overwrite partitioned table with static partition
-insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 
13, 'a13', 1100;
+-- time travel based on commit time, for eg: `20220307091628793`
+select * from hudi_cow_pt_tbl timestamp as of '20220307091628793' where id = 1;
+-- time travel based on different timestamp formats
+select * from hudi_cow_pt_tbl timestamp as of '2022-03-07 09:16:28.100' where 
id = 1;
+select * from hudi_cow_pt_tbl timestamp as of '2022-03-08' where id = 1;
 ```
+
 </TabItem>
 
 </Tabs
 >
 
-## More Spark SQL Commands
 
-### Alter Table
+### Incremental query
 
-Schema evolution can be achieved via `ALTER TABLE` commands. Below shows some 
basic examples.
+Hudi also provides capability to obtain a stream of records that changed since 
given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin 
time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given 
commit (as is the common case). 
 
-:::note
-For more detailed examples, please prefer to [schema 
evolution](schema_evolution)
-:::
+<Tabs
+groupId="programming-language"
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+{ label: 'Spark SQL', value: 'sparksql', }
+]}
+>
 
-**Syntax**
-```sql
--- Alter table name
-ALTER TABLE oldTableName RENAME TO newTableName
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+spark.
+  read.
+  format("hudi").
+  load(basePath).
+  createOrReplaceTempView("hudi_trips_snapshot")
 
--- Alter table add columns
-ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
+val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime 
from  hudi_trips_snapshot order by commitTime").map(k => 
k.getString(0)).take(50)
+val beginTime = commits(commits.length - 2) // commit time we are interested in
 
--- Alter table column type
-ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
+// incrementally query data
+val tripsIncrementalDF = spark.read.format("hudi").
+  option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
+  option(BEGIN_INSTANTTIME.key(), beginTime).
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
 
--- Alter table properties
-ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  
hudi_trips_incremental where fare > 20.0").show()
 ```
-**Examples**
-```sql
---rename to:
-ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
 
---add column:
-ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select 
distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by 
commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': beginTime,
+}
 
---change column:
-ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
 
---set properties;
-alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = 
'10');
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  
hudi_trips_incremental where fare > 20.0").show()
 ```
 
-### Partition SQL Command
+</TabItem>
 
-**Syntax**
+<TabItem value="sparksql">
 
 ```sql
--- Add Partition
-ALTER TABLE tableIdentifier ADD PARTITION ( partition_col_name = 
partition_col_val [ , ... ] )
+-- syntax
+hudi_table_changes(table or path, queryType, beginTime [, endTime]);  
+-- table or path: table identifier, example: db.tableName, tableName, 
+--                or path for of your table, example: path/to/hudiTable  
+--                in this case table does not need to exist in the metastore,
+-- queryType: incremental query mode, example: latest_state, cdc  
+--            (for cdc query, first enable cdc for your table by setting 
cdc.enabled=true),
+-- beginTime: instantTime to begin query from, example: earliest, 
202305150000, 
+-- endTime: optional instantTime to end query at, example: 202305160000, 
 
--- Drop Partition
-ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = 
partition_col_val [ , ... ] )
+-- incrementally query data by table name
+-- start from earliest available commit, end at latest available commit.  
+select * from hudi_table_changes('db.table', 'latest_state', 'earliest');
 
--- Show Partitions
-SHOW PARTITIONS tableIdentifier
-```
-**Examples**
-```sql
---show partition:
-show partitions hudi_cow_pt_tbl;
+-- start from earliest, end at 202305160000.  
+select * from hudi_table_changes('table', 'latest_state', 'earliest', 
'202305160000');  
+
+-- start from 202305150000, end at 202305160000.
+select * from hudi_table_changes('table', 'latest_state', '202305150000', 
'202305160000');
+
+-- incrementally query data by path
+-- start from earliest available commit, end at latest available commit.
+select * from hudi_table_changes('path/to/table', 'cdc', 'earliest');
+
+-- start from earliest, end at 202305160000.
+select * from hudi_table_changes('path/to/table', 'cdc', 'earliest', 
'202305160000');
+
+-- start from 202305150000, end at 202305160000.
+select * from hudi_table_changes('path/to/table', 'cdc', '202305150000', 
'202305160000');
 
---drop partition:
-alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10');
 ```
-:::note
-Currently,  the result of `show partitions` is based on the filesystem table 
path. It's not precise when delete the whole partition data or drop certain 
partition directly.
 
+</TabItem>
+
+</Tabs
+>
+
+:::info
+This will give all changes that happened after the beginTime commit with the 
filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
 :::
 
-### Procedures
+## Streaming writers
+### Hudi Streamer
+Hudi provides a ingestion tool to assist with ingesting data into hudi from 
various difference sources in a streaming manner. 
+This has lot of niceties like auto checkpointing, schema enforcement via 
schema provider, transformation support and so on.
+Please refer to [here](/docs/next/hoodie_deltastreamer#hudi-streamer) for more 
info. 
 
-**Syntax**
-```sql
---Call procedure by positional arguments
-CALL system.procedure_name(arg_1, arg_2, ... arg_n)
+### Structured Streaming
 
---Call procedure by named arguments
-CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1, ... 
arg_name_n => arg_n)
-```
-**Examples**
-```sql
---show commit's info
-call show_commits(table => 'test_hudi_table', limit => 10);
-```
+Hudi supports Spark Structured Streaming reads and writes as well. Please 
refer to [here](/docs/next/toBeFixed) for more info.

Review Comment:
   yes, once we land this https://github.com/apache/hudi/pull/9701, will update 
it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to