This is an automated email from the ASF dual-hosted git repository.

lamberken pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 531b42a  [HUDI-783] Add pyspark example in quickstart (#1526)
531b42a is described below

commit 531b42a529b7a2f248c062c1e5b351e79239baca
Author: Edwin Guo <[email protected]>
AuthorDate: Sun May 3 20:18:17 2020 -0400

    [HUDI-783] Add pyspark example in quickstart (#1526)
    
    * [HUDI-783] Pyspark Docs - Add Hudi pyspark insert example.
    
    * [HUDI-783] Pyspark Docs - Add Hudi pyspark query and update example.
    
    * [HUDI-783] Pyspark Docs - Add Hudi pyspark incremental example.
    
    * Pyspark Docs - Add Hudi pyspark delete example.
    
    * [HUDI-783] Pyspark Docs - Fix syntax issue
    
    * [HUDI-1526] Address PR comments.
    
    * [HUDI-1526] Pyspark Docs - Fix syntax issue
    
    * [HUDI-783] Pyspark Docs - Update Formatting.
    
    * [HUDI-1526] Reformatted the docs
    
    * [HUDI-1526] Address pr docs comments.
    
    * [HUDI-783] Add pyspark example in quickstart #1526
---
 docs/_docs/1_1_quick_start_guide.md | 236 +++++++++++++++++++++++++++++++++++-
 1 file changed, 233 insertions(+), 3 deletions(-)

diff --git a/docs/_docs/1_1_quick_start_guide.md 
b/docs/_docs/1_1_quick_start_guide.md
index 08269ec..3e088dd 100644
--- a/docs/_docs/1_1_quick_start_guide.md
+++ b/docs/_docs/1_1_quick_start_guide.md
@@ -9,13 +9,15 @@ This guide provides a quick peek at Hudi's capabilities using 
spark-shell. Using
 code snippets that allows you to insert and update a Hudi table of default 
table type: 
 [Copy on Write](/docs/concepts.html#copy-on-write-table). 
 After each write operation we will also show how to read the data both 
snapshot and incrementally.
+# Scala example
 
-## Setup spark-shell
+## Setup
 
 Hudi works with Spark-2.x versions. You can follow instructions 
[here](https://spark.apache.org/downloads.html) for setting up spark. 
 From the extracted directory run spark-shell with Hudi as:
 
 ```scala
+// spark-shell
 spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
   --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
@@ -34,6 +36,7 @@ spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
 Setup table name, base path and a data generator to generate records for this 
guide.
 
 ```scala
+// spark-shell
 import org.apache.hudi.QuickstartUtils._
 import scala.collection.JavaConversions._
 import org.apache.spark.sql.SaveMode._
@@ -56,6 +59,7 @@ can generate sample inserts and updates based on the the 
sample trip schema [her
 Generate some new trips, load them into a DataFrame and write the DataFrame 
into the Hudi table as below.
 
 ```scala
+// spark-shell
 val inserts = convertToStringList(dataGen.generateInserts(10))
 val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
 df.write.format("hudi").
@@ -77,12 +81,13 @@ and for info on ways to ingest data into Hudi, refer to 
[Writing Hudi Tables](/d
 Here we are using the default write operation : `upsert`. If you have a 
workload without updates, you can also issue 
 `insert` or `bulk_insert` operations which could be faster. To know more, 
refer to [Write operations](/docs/writing_data#write-operations)
 {: .notice--info}
- 
+
 ## Query data 
 
 Load the data files into a DataFrame.
 
 ```scala
+// spark-shell
 val tripsSnapshotDF = spark.
   read.
   format("hudi").
@@ -104,6 +109,7 @@ This is similar to inserting new data. Generate updates to 
existing trips using
 and write DataFrame into the hudi table.
 
 ```scala
+// spark-shell
 val updates = convertToStringList(dataGen.generateUpdates(10))
 val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
 df.write.format("hudi").
@@ -128,6 +134,7 @@ This can be achieved using Hudi's incremental querying and 
providing a begin tim
 We do not need to specify endTime, if we want all changes after the given 
commit (as is the common case). 
 
 ```scala
+// spark-shell
 // reload data
 spark.
   read.
@@ -158,6 +165,7 @@ Lets look at how to query data as of a specific time. The 
specific time can be r
 specific commit time and beginTime to "000" (denoting earliest possible commit 
time). 
 
 ```scala
+// spark-shell
 val beginTime = "000" // Represents all commits > this time.
 val endTime = commits(commits.length - 2) // commit time we are interested in
 
@@ -168,13 +176,14 @@ val tripsPointInTimeDF = spark.read.format("hudi").
   option(END_INSTANTTIME_OPT_KEY, endTime).
   load(basePath)
 tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
-spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  
hudi_trips_point_in_time where fare > 20.0").show()
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from 
hudi_trips_point_in_time where fare > 20.0").show()
 ```
 
 ## Delete data {#deletes}
 Delete records for the HoodieKeys passed in.
 
 ```scala
+// spark-shell
 // fetch total records count
 spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
 // fetch two records to be deleted
@@ -204,6 +213,227 @@ spark.sql("select uuid, partitionPath from 
hudi_trips_snapshot").count()
 ```
 Note: Only `Append` mode is supported for delete operation.
 
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions 
[here](https://spark.apache.org/downloads.html) for setting up spark. 
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+  --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
+  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+  <h4>Please note the following: </h4>
+<ul>
+  <li>spark-avro module needs to be specified in --packages as it is not 
included with spark-shell by default</li>
+  <li>spark-avro and spark versions must match (we have used 2.4.4 for both 
above)</li>
+  <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro 
module used also depends on 2.11. 
+         If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12 
needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this 
guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The 
[DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50)
 
+can generate sample inserts and updates based on the the sample trip schema 
[here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame 
into the Hudi table as below.
+
+```python
+# pyspark
+inserts = 
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'insert',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("overwrite"). \
+  save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under 
`/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key 
+(`uuid` in 
[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)),
 partition field (`region/county/city`) and combine logic (`ts` in 
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58))
 to ensure trip records are unique within each partition. For more info, refer 
to 
+[Modeling data stored in 
Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi 
Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a 
workload without updates, you can also issue 
+`insert` or `bulk_insert` operations which could be faster. To know more, 
refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data 
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot 
where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key, 
_hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our 
partition path (`region/country/city`) is 3 levels nested 
+from base path we ve used `load(basePath + "/*/*/*/*")`. 
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for 
more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips 
using the data generator, load into a DataFrame 
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates = 
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+  options(**hudi_options). \
+  mode("append"). \
+  save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode 
unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write 
operation generates a new 
[commit](http://hudi.incubator.apache.org/docs/concepts.html) 
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, 
`driver` fields for the same `_hoodie_record_key`s in previous commit. 
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since 
given commit timestamp. 
+This can be achieved using Hudi's incremental querying and providing a begin 
time from which changes need to be streamed. 
+We do not need to specify endTime, if we want all changes after the given 
commit (as is the common case). 
+
+```python
+# pyspark
+# reload data
+spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*"). \
+  createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select 
distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by 
commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.begin.instanttime': beginTime,
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+  options(**incremental_read_options). \
+  load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  
hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the 
filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}
+
+## Point in time query
+
+Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a 
+specific commit time and beginTime to "000" (denoting earliest possible commit 
time). 
+
+```python
+# pyspark
+beginTime = "000" # Represents all commits > this time.
+endTime = commits[len(commits) - 2]
+
+# query point in time data
+point_in_time_read_options = {
+  'hoodie.datasource.query.type': 'incremental',
+  'hoodie.datasource.read.end.instanttime': endTime,
+  'hoodie.datasource.read.begin.instanttime': beginTime
+}
+
+tripsPointInTimeDF = spark.read.format("hudi"). \
+  options(**point_in_time_read_options). \
+  load(basePath)
+
+tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from 
hudi_trips_point_in_time where fare > 20.0").show()
+```
+
+## Delete data {#deletes}
+Delete records for the HoodieKeys passed in.
+
+Note: Only `Append` mode is supported for delete operation.
+
+```python
+# pyspark
+# fetch total records count
+spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+# fetch two records to be deleted
+ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
+
+# issue deletes
+hudi_delete_options = {
+  'hoodie.table.name': tableName,
+  'hoodie.datasource.write.recordkey.field': 'uuid',
+  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+  'hoodie.datasource.write.table.name': tableName,
+  'hoodie.datasource.write.operation': 'delete',
+  'hoodie.datasource.write.precombine.field': 'ts',
+  'hoodie.upsert.shuffle.parallelism': 2, 
+  'hoodie.insert.shuffle.parallelism': 2
+}
+
+from pyspark.sql.functions import lit
+deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
+df = spark.sparkContext.parallelize(deletes).toDF(['partitionpath', 
'uuid']).withColumn('ts', lit(0.0))
+df.write.format("hudi"). \
+  options(**hudi_delete_options). \
+  mode("append"). \
+  save(basePath)
+
+# run the same read query as above.
+roAfterDeleteViewDF = spark. \
+  read. \
+  format("hudi"). \
+  load(basePath + "/*/*/*/*") 
+roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
+# fetch should return (total - 2) records
+spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+```
+
+
+
 ## Where to go from here?
 
 You can also do the quickstart by [building hudi 
yourself](https://github.com/apache/incubator-hudi#building-apache-hudi-from-source),
 

Reply via email to