This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 66724f54b1 [Docs] Add Spark Structured Streaming docs to Spark quick 
start guide (#7253)
66724f54b1 is described below

commit 66724f54b1a40e48ed84f125f28274c292d03b08
Author: kazdy <[email protected]>
AuthorDate: Mon Nov 21 11:36:10 2022 +0100

    [Docs] Add Spark Structured Streaming docs to Spark quick start guide 
(#7253)
---
 website/docs/quick-start-guide.md                  | 187 +++++++++++++++++++++
 .../version-0.10.0/quick-start-guide.md            | 177 +++++++++++++++++++
 .../version-0.10.1/quick-start-guide.md            | 177 +++++++++++++++++++
 .../version-0.11.0/quick-start-guide.md            | 177 +++++++++++++++++++
 .../version-0.11.1/quick-start-guide.md            | 177 +++++++++++++++++++
 .../version-0.12.0/quick-start-guide.md            | 177 +++++++++++++++++++
 .../version-0.12.1/quick-start-guide.md            | 177 +++++++++++++++++++
 .../version-0.8.0/quick-start-guide.md             |  89 ++++++++++
 .../version-0.9.0/quick-start-guide.md             | 177 +++++++++++++++++++
 9 files changed, 1515 insertions(+)

diff --git a/website/docs/quick-start-guide.md 
b/website/docs/quick-start-guide.md
index e00f6cdb25..64ba8e8360 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -899,6 +899,193 @@ This will give all changes that happened after the 
beginTime commit with the fil
 feature is that it now lets you author streaming pipelines on batch data.
 :::
 
+## Structured Streaming
+
+Hudi supports Spark Structured Streaming reads and writes.
+Structured Streaming reads are based on Hudi Incremental Query feature, 
therefore streaming read can return data for which commits and base files were 
not yet removed by the cleaner. You can control commits retention time.
+
+### Streaming Read
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# ead stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+### Streaming Write
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, streamingTableName).
+  outputMode("append").
+  option("path", baseStreamingPath).
+  option("checkpointLocation", checkpointLocation).
+  trigger(Trigger.Once()).
+  start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+    'hoodie.table.name': streamingTableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': streamingTableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+    .options(**hudi_streaming_options) \
+    .outputMode("append") \
+    .option("path", baseStreamingPath) \
+    .option("checkpointLocation", checkpointLocation) \
+    .trigger(once=True) \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
+Target table must exist before write.
+:::
+
+### Table maintenance
+Hudi can run async or inline table services while running Strucrured Streaming 
query and takes care of cleaning and compaction. There's no operational 
overhead for the user.
+
+Hive Sync works with Structured Streaming, it will create table if not exists 
and synchronize table in metastore aftear each streaming write.
+
+:::info
+If you're using Foreach or ForeachBatch streaming sink you must explicitly use 
inline table services. 
+Async table services are not supported.
+:::
+
 ## Point in time query
 
 Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a 
diff --git a/website/versioned_docs/version-0.10.0/quick-start-guide.md 
b/website/versioned_docs/version-0.10.0/quick-start-guide.md
index 6847e20a79..f5392bd05b 100644
--- a/website/versioned_docs/version-0.10.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.10.0/quick-start-guide.md
@@ -805,6 +805,183 @@ This will give all changes that happened after the 
beginTime commit with the fil
 feature is that it now lets you author streaming pipelines on batch data.
 :::
 
+## Structured Streaming
+
+Hudi supports Spark Structured Streaming reads and writes.
+Structured Streaming reads are based on Hudi Incremental Query feature, 
therefore streaming read can return data for which commits and base files were 
not yet removed by the cleaner. You can control commits retention time.
+
+### Streaming Read
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# read stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+### Streaming Write
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, streamingTableName).
+  outputMode("append").
+  option("path", baseStreamingPath).
+  option("checkpointLocation", checkpointLocation).
+  trigger(Trigger.Once()).
+  start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+    'hoodie.table.name': streamingTableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': streamingTableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+    .options(**hudi_streaming_options) \
+    .outputMode("append") \
+    .option("path", baseStreamingPath) \
+    .option("checkpointLocation", checkpointLocation) \
+    .trigger(once=True) \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
+Target table must exist before write.
+:::
+
 ## Point in time query
 
 Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a
diff --git a/website/versioned_docs/version-0.10.1/quick-start-guide.md 
b/website/versioned_docs/version-0.10.1/quick-start-guide.md
index b65d786838..7254da0fc2 100644
--- a/website/versioned_docs/version-0.10.1/quick-start-guide.md
+++ b/website/versioned_docs/version-0.10.1/quick-start-guide.md
@@ -822,6 +822,183 @@ This will give all changes that happened after the 
beginTime commit with the fil
 feature is that it now lets you author streaming pipelines on batch data.
 :::
 
+## Structured Streaming
+
+Hudi supports Spark Structured Streaming reads and writes.
+Structured Streaming reads are based on Hudi Incremental Query feature, 
therefore streaming read can return data for which commits and base files were 
not yet removed by the cleaner. You can control commits retention time.
+
+### Streaming Read
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# read stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+### Streaming Write
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, streamingTableName).
+  outputMode("append").
+  option("path", baseStreamingPath).
+  option("checkpointLocation", checkpointLocation).
+  trigger(Trigger.Once()).
+  start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+    'hoodie.table.name': streamingTableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': streamingTableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+    .options(**hudi_streaming_options) \
+    .outputMode("append") \
+    .option("path", baseStreamingPath) \
+    .option("checkpointLocation", checkpointLocation) \
+    .trigger(once=True) \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
+Target table must exist before write.
+:::
+
 ## Point in time query
 
 Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a 
diff --git a/website/versioned_docs/version-0.11.0/quick-start-guide.md 
b/website/versioned_docs/version-0.11.0/quick-start-guide.md
index c2d840aae9..97305e0e0b 100644
--- a/website/versioned_docs/version-0.11.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.11.0/quick-start-guide.md
@@ -866,6 +866,183 @@ This will give all changes that happened after the 
beginTime commit with the fil
 feature is that it now lets you author streaming pipelines on batch data.
 :::
 
+## Structured Streaming
+
+Hudi supports Spark Structured Streaming reads and writes.
+Structured Streaming reads are based on Hudi Incremental Query feature, 
therefore streaming read can return data for which commits and base files were 
not yet removed by the cleaner. You can control commits retention time.
+
+### Streaming Read
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# read stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+### Streaming Write
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, streamingTableName).
+  outputMode("append").
+  option("path", baseStreamingPath).
+  option("checkpointLocation", checkpointLocation).
+  trigger(Trigger.Once()).
+  start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+    'hoodie.table.name': streamingTableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': streamingTableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+    .options(**hudi_streaming_options) \
+    .outputMode("append") \
+    .option("path", baseStreamingPath) \
+    .option("checkpointLocation", checkpointLocation) \
+    .trigger(once=True) \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
+Target table must exist before write.
+:::
+
 ## Point in time query
 
 Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a 
diff --git a/website/versioned_docs/version-0.11.1/quick-start-guide.md 
b/website/versioned_docs/version-0.11.1/quick-start-guide.md
index d7d112d457..800507a05f 100644
--- a/website/versioned_docs/version-0.11.1/quick-start-guide.md
+++ b/website/versioned_docs/version-0.11.1/quick-start-guide.md
@@ -866,6 +866,183 @@ This will give all changes that happened after the 
beginTime commit with the fil
 feature is that it now lets you author streaming pipelines on batch data.
 :::
 
+## Structured Streaming
+
+Hudi supports Spark Structured Streaming reads and writes.
+Structured Streaming reads are based on Hudi Incremental Query feature, 
therefore streaming read can return data for which commits and base files were 
not yet removed by the cleaner. You can control commits retention time.
+
+### Streaming Read
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# read stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+### Streaming Write
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, streamingTableName).
+  outputMode("append").
+  option("path", baseStreamingPath).
+  option("checkpointLocation", checkpointLocation).
+  trigger(Trigger.Once()).
+  start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+    'hoodie.table.name': streamingTableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': streamingTableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+    .options(**hudi_streaming_options) \
+    .outputMode("append") \
+    .option("path", baseStreamingPath) \
+    .option("checkpointLocation", checkpointLocation) \
+    .trigger(once=True) \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
+Target table must exist before write.
+:::
+
 ## Point in time query
 
 Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a 
diff --git a/website/versioned_docs/version-0.12.0/quick-start-guide.md 
b/website/versioned_docs/version-0.12.0/quick-start-guide.md
index ed7bb29698..9c27623933 100644
--- a/website/versioned_docs/version-0.12.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.12.0/quick-start-guide.md
@@ -899,6 +899,183 @@ This will give all changes that happened after the 
beginTime commit with the fil
 feature is that it now lets you author streaming pipelines on batch data.
 :::
 
+## Structured Streaming
+
+Hudi supports Spark Structured Streaming reads and writes.
+Structured Streaming reads are based on Hudi Incremental Query feature, 
therefore streaming read can return data for which commits and base files were 
not yet removed by the cleaner. You can control commits retention time.
+
+### Streaming Read
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# read stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+### Streaming Write
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, streamingTableName).
+  outputMode("append").
+  option("path", baseStreamingPath).
+  option("checkpointLocation", checkpointLocation).
+  trigger(Trigger.Once()).
+  start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+    'hoodie.table.name': streamingTableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': streamingTableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+    .options(**hudi_streaming_options) \
+    .outputMode("append") \
+    .option("path", baseStreamingPath) \
+    .option("checkpointLocation", checkpointLocation) \
+    .trigger(once=True) \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
+Target table must exist before write.
+:::
+
 ## Point in time query
 
 Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a 
diff --git a/website/versioned_docs/version-0.12.1/quick-start-guide.md 
b/website/versioned_docs/version-0.12.1/quick-start-guide.md
index c610964f6c..f8f26ea440 100644
--- a/website/versioned_docs/version-0.12.1/quick-start-guide.md
+++ b/website/versioned_docs/version-0.12.1/quick-start-guide.md
@@ -899,6 +899,183 @@ This will give all changes that happened after the 
beginTime commit with the fil
 feature is that it now lets you author streaming pipelines on batch data.
 :::
 
+## Structured Streaming
+
+Hudi supports Spark Structured Streaming reads and writes.
+Structured Streaming reads are based on Hudi Incremental Query feature, 
therefore streaming read can return data for which commits and base files were 
not yet removed by the cleaner. You can control commits retention time.
+
+### Streaming Read
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# read stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+### Streaming Write
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, streamingTableName).
+  outputMode("append").
+  option("path", baseStreamingPath).
+  option("checkpointLocation", checkpointLocation).
+  trigger(Trigger.Once()).
+  start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+    'hoodie.table.name': streamingTableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': streamingTableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+    .options(**hudi_streaming_options) \
+    .outputMode("append") \
+    .option("path", baseStreamingPath) \
+    .option("checkpointLocation", checkpointLocation) \
+    .trigger(once=True) \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
+Target table must exist before write.
+:::
+
 ## Point in time query
 
 Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a 
diff --git a/website/versioned_docs/version-0.8.0/quick-start-guide.md 
b/website/versioned_docs/version-0.8.0/quick-start-guide.md
index 2e1d9ec13d..d7db345947 100644
--- a/website/versioned_docs/version-0.8.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.8.0/quick-start-guide.md
@@ -358,6 +358,95 @@ This will give all changes that happened after the 
beginTime commit with the fil
 feature is that it now lets you author streaming pipelines on batch data.
 :::
 
+## Structured Streaming
+
+Hudi supports Spark Structured Streaming reads.
+Structured Streaming reads are based on Hudi Incremental Query feature, 
therefore streaming read can return data for which commits and base files were 
not yet removed by the cleaner. You can control commits retention time.
+
+### Streaming Read
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# read stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
 ## Point in time query
 
 Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a 
diff --git a/website/versioned_docs/version-0.9.0/quick-start-guide.md 
b/website/versioned_docs/version-0.9.0/quick-start-guide.md
index 6a7e1cc7c8..bb226555c4 100644
--- a/website/versioned_docs/version-0.9.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.9.0/quick-start-guide.md
@@ -776,6 +776,183 @@ This will give all changes that happened after the 
beginTime commit with the fil
 feature is that it now lets you author streaming pipelines on batch data.
 :::
 
+## Structured Streaming
+
+Hudi supports Spark Structured Streaming reads and writes.
+Structured Streaming reads are based on Hudi Incremental Query feature, 
therefore streaming read can return data for which commits and base files were 
not yet removed by the cleaner. You can control commits retention time.
+
+### Streaming Read
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// reload data
+df.write.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, tableName).
+  mode(Overwrite).
+  save(basePath)
+
+// read stream and output results to console
+spark.readStream.
+  format("hudi").
+  load(basePath).
+  writeStream.
+  format("console").
+  start()
+
+// read stream to streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# reload data
+inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
+    dataGen.generateInserts(10))
+df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+
+hudi_options = {
+    'hoodie.table.name': tableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': tableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+df.write.format("hudi"). \
+    options(**hudi_options). \
+    mode("overwrite"). \
+    save(basePath)
+
+# read stream to streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# read stream and output results to console
+spark.readStream \
+    .format("hudi") \
+    .load(basePath) \
+    .writeStream \
+    .format("console") \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+### Streaming Write
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', },
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+// prepare to stream write to new table
+import org.apache.spark.sql.streaming.Trigger
+
+val streamingTableName = "hudi_trips_cow_streaming"
+val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+// create streaming df
+val df = spark.readStream.
+        format("hudi").
+        load(basePath)
+
+// write stream to new hudi table
+df.writeStream.format("hudi").
+  options(getQuickstartWriteConfigs).
+  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+  option(TABLE_NAME, streamingTableName).
+  outputMode("append").
+  option("path", baseStreamingPath).
+  option("checkpointLocation", checkpointLocation).
+  trigger(Trigger.Once()).
+  start()
+
+```
+
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+# prepare to stream write to new table
+streamingTableName = "hudi_trips_cow_streaming"
+baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
+checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+
+hudi_streaming_options = {
+    'hoodie.table.name': streamingTableName,
+    'hoodie.datasource.write.recordkey.field': 'uuid',
+    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+    'hoodie.datasource.write.table.name': streamingTableName,
+    'hoodie.datasource.write.operation': 'upsert',
+    'hoodie.datasource.write.precombine.field': 'ts',
+    'hoodie.upsert.shuffle.parallelism': 2,
+    'hoodie.insert.shuffle.parallelism': 2
+}
+
+# create streaming df
+df = spark.readStream \
+    .format("hudi") \
+    .load(basePath)
+
+# write stream to new hudi table
+df.writeStream.format("hudi") \
+    .options(**hudi_streaming_options) \
+    .outputMode("append") \
+    .option("path", baseStreamingPath) \
+    .option("checkpointLocation", checkpointLocation) \
+    .trigger(once=True) \
+    .start()
+
+```
+
+</TabItem>
+
+</Tabs
+>
+
+:::info
+Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE 
and MERGE INTO.
+Target table must exist before write.
+:::
+
 ## Point in time query
 
 Lets look at how to query data as of a specific time. The specific time can be 
represented by pointing endTime to a 


Reply via email to