This is an automated email from the ASF dual-hosted git repository.
lamberken pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 531b42a [HUDI-783] Add pyspark example in quickstart (#1526)
531b42a is described below
commit 531b42a529b7a2f248c062c1e5b351e79239baca
Author: Edwin Guo <[email protected]>
AuthorDate: Sun May 3 20:18:17 2020 -0400
[HUDI-783] Add pyspark example in quickstart (#1526)
* [HUDI-783] Pyspark Docs - Add Hudi pyspark insert example.
* [HUDI-783] Pyspark Docs - Add Hudi pyspark query and update example.
* [HUDI-783] Pyspark Docs - Add Hudi pyspark incremental example.
* Pyspark Docs - Add Hudi pyspark delete example.
* [HUDI-783] Pyspark Docs - Fix syntax issue
* [HUDI-1526] Address PR comments.
* [HUDI-1526] Pyspark Docs - Fix syntax issue
* [HUDI-783] Pyspark Docs - Update Formatting.
* [HUDI-1526] Reformatted the docs
* [HUDI-1526] Address pr docs comments.
* [HUDI-783] Add pyspark example in quickstart #1526
---
docs/_docs/1_1_quick_start_guide.md | 236 +++++++++++++++++++++++++++++++++++-
1 file changed, 233 insertions(+), 3 deletions(-)
diff --git a/docs/_docs/1_1_quick_start_guide.md
b/docs/_docs/1_1_quick_start_guide.md
index 08269ec..3e088dd 100644
--- a/docs/_docs/1_1_quick_start_guide.md
+++ b/docs/_docs/1_1_quick_start_guide.md
@@ -9,13 +9,15 @@ This guide provides a quick peek at Hudi's capabilities using
spark-shell. Using
code snippets that allows you to insert and update a Hudi table of default
table type:
[Copy on Write](/docs/concepts.html#copy-on-write-table).
After each write operation we will also show how to read the data both
snapshot and incrementally.
+# Scala example
-## Setup spark-shell
+## Setup
Hudi works with Spark-2.x versions. You can follow instructions
[here](https://spark.apache.org/downloads.html) for setting up spark.
From the extracted directory run spark-shell with Hudi as:
```scala
+// spark-shell
spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
--packages
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
\
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
@@ -34,6 +36,7 @@ spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
Setup table name, base path and a data generator to generate records for this
guide.
```scala
+// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
@@ -56,6 +59,7 @@ can generate sample inserts and updates based on the the
sample trip schema [her
Generate some new trips, load them into a DataFrame and write the DataFrame
into the Hudi table as below.
```scala
+// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
@@ -77,12 +81,13 @@ and for info on ways to ingest data into Hudi, refer to
[Writing Hudi Tables](/d
Here we are using the default write operation : `upsert`. If you have a
workload without updates, you can also issue
`insert` or `bulk_insert` operations which could be faster. To know more,
refer to [Write operations](/docs/writing_data#write-operations)
{: .notice--info}
-
+
## Query data
Load the data files into a DataFrame.
```scala
+// spark-shell
val tripsSnapshotDF = spark.
read.
format("hudi").
@@ -104,6 +109,7 @@ This is similar to inserting new data. Generate updates to
existing trips using
and write DataFrame into the hudi table.
```scala
+// spark-shell
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
@@ -128,6 +134,7 @@ This can be achieved using Hudi's incremental querying and
providing a begin tim
We do not need to specify endTime, if we want all changes after the given
commit (as is the common case).
```scala
+// spark-shell
// reload data
spark.
read.
@@ -158,6 +165,7 @@ Lets look at how to query data as of a specific time. The
specific time can be r
specific commit time and beginTime to "000" (denoting earliest possible commit
time).
```scala
+// spark-shell
val beginTime = "000" // Represents all commits > this time.
val endTime = commits(commits.length - 2) // commit time we are interested in
@@ -168,13 +176,14 @@ val tripsPointInTimeDF = spark.read.format("hudi").
option(END_INSTANTTIME_OPT_KEY, endTime).
load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
-spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_point_in_time where fare > 20.0").show()
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_point_in_time where fare > 20.0").show()
```
## Delete data {#deletes}
Delete records for the HoodieKeys passed in.
```scala
+// spark-shell
// fetch total records count
spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
// fetch two records to be deleted
@@ -204,6 +213,227 @@ spark.sql("select uuid, partitionPath from
hudi_trips_snapshot").count()
```
Note: Only `Append` mode is supported for delete operation.
+# Pyspark example
+## Setup
+
+Hudi works with Spark-2.x versions. You can follow instructions
[here](https://spark.apache.org/downloads.html) for setting up spark.
+From the extracted directory run spark-shell with Hudi as:
+
+```python
+# pyspark
+export PYSPARK_PYTHON=$(which python3)
+spark-2.4.4-bin-hadoop2.7/bin/pyspark \
+ --packages
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
\
+ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
+```
+
+<div class="notice--info">
+ <h4>Please note the following: </h4>
+<ul>
+ <li>spark-avro module needs to be specified in --packages as it is not
included with spark-shell by default</li>
+ <li>spark-avro and spark versions must match (we have used 2.4.4 for both
above)</li>
+ <li>we have used hudi-spark-bundle built for scala 2.11 since the spark-avro
module used also depends on 2.11.
+ If spark-avro_2.12 is used, correspondingly hudi-spark-bundle_2.12
needs to be used. </li>
+</ul>
+</div>
+
+Setup table name, base path and a data generator to generate records for this
guide.
+
+```python
+# pyspark
+tableName = "hudi_trips_cow"
+basePath = "file:///tmp/hudi_trips_cow"
+dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
+```
+
+The
[DataGenerator](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50)
+can generate sample inserts and updates based on the the sample trip schema
[here](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57)
+{: .notice--info}
+
+
+## Insert data
+
+Generate some new trips, load them into a DataFrame and write the DataFrame
into the Hudi table as below.
+
+```python
+# pyspark
+inserts =
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'insert',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+ options(**hudi_options). \
+ mode("overwrite"). \
+ save(basePath)
+```
+
+`mode(Overwrite)` overwrites and recreates the table if it already exists.
+You can check the data generated under
`/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key
+(`uuid` in
[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)),
partition field (`region/county/city`) and combine logic (`ts` in
+[schema](https://github.com/apache/incubator-hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58))
to ensure trip records are unique within each partition. For more info, refer
to
+[Modeling data stored in
Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi)
+and for info on ways to ingest data into Hudi, refer to [Writing Hudi
Tables](/docs/writing_data.html).
+Here we are using the default write operation : `upsert`. If you have a
workload without updates, you can also issue
+`insert` or `bulk_insert` operations which could be faster. To know more,
refer to [Write operations](/docs/writing_data#write-operations)
+{: .notice--info}
+
+## Query data
+
+Load the data files into a DataFrame.
+
+```python
+# pyspark
+tripsSnapshotDF = spark. \
+ read. \
+ format("hudi"). \
+ load(basePath + "/*/*/*/*")
+
+tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+
+spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot
where fare > 20.0").show()
+spark.sql("select _hoodie_commit_time, _hoodie_record_key,
_hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
+```
+
+This query provides snapshot querying of the ingested data. Since our
partition path (`region/country/city`) is 3 levels nested
+from base path we ve used `load(basePath + "/*/*/*/*")`.
+Refer to [Table types and queries](/docs/concepts#table-types--queries) for
more info on all table types and query types supported.
+{: .notice--info}
+
+## Update data
+
+This is similar to inserting new data. Generate updates to existing trips
using the data generator, load into a DataFrame
+and write DataFrame into the hudi table.
+
+```python
+# pyspark
+updates =
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
+df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
+df.write.format("hudi"). \
+ options(**hudi_options). \
+ mode("append"). \
+ save(basePath)
+```
+
+Notice that the save mode is now `Append`. In general, always use append mode
unless you are trying to create the table for the first time.
+[Querying](#query-data) the data again will now show updated trips. Each write
operation generates a new
[commit](http://hudi.incubator.apache.org/docs/concepts.html)
+denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`,
`driver` fields for the same `_hoodie_record_key`s in previous commit.
+{: .notice--info}
+
+## Incremental query
+
+Hudi also provides capability to obtain a stream of records that changed since
given commit timestamp.
+This can be achieved using Hudi's incremental querying and providing a begin
time from which changes need to be streamed.
+We do not need to specify endTime, if we want all changes after the given
commit (as is the common case).
+
+```python
+# pyspark
+# reload data
+spark. \
+ read. \
+ format("hudi"). \
+ load(basePath + "/*/*/*/*"). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+commits = list(map(lambda row: row[0], spark.sql("select
distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by
commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
+
+# incrementally query data
+incremental_read_options = {
+ 'hoodie.datasource.query.type': 'incremental',
+ 'hoodie.datasource.read.begin.instanttime': beginTime,
+}
+
+tripsIncrementalDF = spark.read.format("hudi"). \
+ options(**incremental_read_options). \
+ load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
+```
+
+This will give all changes that happened after the beginTime commit with the
filter of fare > 20.0. The unique thing about this
+feature is that it now lets you author streaming pipelines on batch data.
+{: .notice--info}
+
+## Point in time query
+
+Lets look at how to query data as of a specific time. The specific time can be
represented by pointing endTime to a
+specific commit time and beginTime to "000" (denoting earliest possible commit
time).
+
+```python
+# pyspark
+beginTime = "000" # Represents all commits > this time.
+endTime = commits[len(commits) - 2]
+
+# query point in time data
+point_in_time_read_options = {
+ 'hoodie.datasource.query.type': 'incremental',
+ 'hoodie.datasource.read.end.instanttime': endTime,
+ 'hoodie.datasource.read.begin.instanttime': beginTime
+}
+
+tripsPointInTimeDF = spark.read.format("hudi"). \
+ options(**point_in_time_read_options). \
+ load(basePath)
+
+tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_point_in_time where fare > 20.0").show()
+```
+
+## Delete data {#deletes}
+Delete records for the HoodieKeys passed in.
+
+Note: Only `Append` mode is supported for delete operation.
+
+```python
+# pyspark
+# fetch total records count
+spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+# fetch two records to be deleted
+ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)
+
+# issue deletes
+hudi_delete_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'delete',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+from pyspark.sql.functions import lit
+deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
+df = spark.sparkContext.parallelize(deletes).toDF(['partitionpath',
'uuid']).withColumn('ts', lit(0.0))
+df.write.format("hudi"). \
+ options(**hudi_delete_options). \
+ mode("append"). \
+ save(basePath)
+
+# run the same read query as above.
+roAfterDeleteViewDF = spark. \
+ read. \
+ format("hudi"). \
+ load(basePath + "/*/*/*/*")
+roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
+# fetch should return (total - 2) records
+spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()
+```
+
+
+
## Where to go from here?
You can also do the quickstart by [building hudi
yourself](https://github.com/apache/incubator-hudi#building-apache-hudi-from-source),