bhasudha commented on code in PR #9622:
URL: https://github.com/apache/hudi/pull/9622#discussion_r1324521178
##########
website/docs/quick-start-guide.md:
##########
@@ -827,633 +789,348 @@ denoted by the timestamp. Look for changes in
`_hoodie_commit_time`, `rider`, `d
>
-## Incremental query
+## Delete data {#deletes}
-Hudi also provides capability to obtain a stream of records that changed since
given commit timestamp.
-This can be achieved using Hudi's incremental querying and providing a begin
time from which changes need to be streamed.
-We do not need to specify endTime, if we want all changes after the given
commit (as is the common case).
+Hard deletes physically remove any trace of the record from the table. For
example, this deletes records for the HoodieKeys passed in.
+Check out the [deletion section](/docs/writing_data#deletes) for more details.
+<br/><br/>
<Tabs
groupId="programming-language"
-defaultValue="python"
+defaultValue="scala"
values={[
{ label: 'Scala', value: 'scala', },
{ label: 'Python', value: 'python', },
-{ label: 'Spark SQL', value: 'sparksql', }
+{ label: 'Spark SQL', value: 'sparksql', },
]}
>
<TabItem value="scala">
+Delete records for the HoodieKeys passed in.<br/>
```scala
// spark-shell
-// reload data
-spark.
+// fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+// fetch two records to be deleted
+val ds = spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").limit(2)
+
+// issue deletes
+val deletes = dataGen.generateDeletes(ds.collectAsList())
+val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
+
+hardDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "delete").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+
+// run the same read query as above.
+val roAfterDeleteViewDF = spark.
read.
format("hudi").
- load(basePath).
- createOrReplaceTempView("hudi_trips_snapshot")
+ load(basePath)
-val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime
from hudi_trips_snapshot order by commitTime").map(k =>
k.getString(0)).take(50)
-val beginTime = commits(commits.length - 2) // commit time we are interested in
+roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
+// fetch should return (total - 2) records
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+```
+:::note
+Only `Append` mode is supported for delete operation.
+:::
+</TabItem>
+<TabItem value="sparksql">
-// incrementally query data
-val tripsIncrementalDF = spark.read.format("hudi").
- option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
- option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
- load(basePath)
-tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+**Syntax**
+```sql
+DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
+```
+**Example**
+```sql
+delete from hudi_cow_nonpcf_tbl where uuid = 1;
-spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
+delete from hudi_mor_tbl where id % 2 = 0;
+
+-- delete using non-PK field
+delete from hudi_cow_pt_tbl where name = 'a1';
```
</TabItem>
+
<TabItem value="python">
-```python
-# pyspark
-# reload data
-spark. \
- read. \
- format("hudi"). \
- load(basePath). \
- createOrReplaceTempView("hudi_trips_snapshot")
-
-commits = list(map(lambda row: row[0], spark.sql("select
distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by
commitTime").limit(50).collect()))
-beginTime = commits[len(commits) - 2] # commit time we are interested in
-
-# incrementally query data
-incremental_read_options = {
- 'hoodie.datasource.query.type': 'incremental',
- 'hoodie.datasource.read.begin.instanttime': beginTime,
-}
-
-tripsIncrementalDF = spark.read.format("hudi"). \
- options(**incremental_read_options). \
- load(basePath)
-tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
-
-spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
-```
-
-</TabItem>
-
-<TabItem value="sparksql">
-
-```sql
--- syntax
-hudi_table_changes(table or path, queryType, beginTime [, endTime]);
--- table or path: table identifier, example: db.tableName, tableName,
--- or path for of your table, example: path/to/hudiTable
--- in this case table does not need to exist in the metastore,
--- queryType: incremental query mode, example: latest_state, cdc
--- (for cdc query, first enable cdc for your table by setting
cdc.enabled=true),
--- beginTime: instantTime to begin query from, example: earliest,
202305150000,
--- endTime: optional instantTime to end query at, example: 202305160000,
-
--- incrementally query data by table name
--- start from earliest available commit, end at latest available commit.
-select * from hudi_table_changes('db.table', 'latest_state', 'earliest');
-
--- start from earliest, end at 202305160000.
-select * from hudi_table_changes('table', 'latest_state', 'earliest',
'202305160000');
-
--- start from 202305150000, end at 202305160000.
-select * from hudi_table_changes('table', 'latest_state', '202305150000',
'202305160000');
-
--- incrementally query data by path
--- start from earliest available commit, end at latest available commit.
-select * from hudi_table_changes('path/to/table', 'cdc', 'earliest');
-
--- start from earliest, end at 202305160000.
-select * from hudi_table_changes('path/to/table', 'cdc', 'earliest',
'202305160000');
-
--- start from 202305150000, end at 202305160000.
-select * from hudi_table_changes('path/to/table', 'cdc', '202305150000',
'202305160000');
-```
-
-</TabItem>
-
-</Tabs>
-
-:::info
-This will give all changes that happened after the beginTime commit with the
filter of fare > 20.0. The unique thing about this
-feature is that it now lets you author streaming pipelines on batch data.
+:::note
+Only `Append` mode is supported for delete operation.
:::
-## Structured Streaming
-
-Hudi supports Spark Structured Streaming reads and writes.
-Structured Streaming reads are based on Hudi Incremental Query feature,
therefore streaming read can return data for which commits and base files were
not yet removed by the cleaner. You can control commits retention time.
-
-### Streaming Read
-
-<Tabs
-groupId="programming-language"
-defaultValue="python"
-values={[
-{ label: 'Scala', value: 'scala', },
-{ label: 'Python', value: 'python', },
-]}
->
-
-<TabItem value="scala">
-
-```scala
-// spark-shell
-// reload data
-df.write.format("hudi").
- options(getQuickstartWriteConfigs).
- option(PRECOMBINE_FIELD_OPT_KEY, "ts").
- option(RECORDKEY_FIELD_OPT_KEY, "uuid").
- option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
- option(TABLE_NAME, tableName).
- mode(Overwrite).
- save(basePath)
-
-// read stream and output results to console
-spark.readStream.
- format("hudi").
- load(basePath).
- writeStream.
- format("console").
- start()
-
-// read stream to streaming df
-val df = spark.readStream.
- format("hudi").
- load(basePath)
-
-```
-
-</TabItem>
-<TabItem value="python">
-
```python
# pyspark
-# reload data
-inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
- dataGen.generateInserts(10))
-df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+# fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+# fetch two records to be deleted
+ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
-hudi_options = {
- 'hoodie.table.name': tableName,
- 'hoodie.datasource.write.recordkey.field': 'uuid',
- 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': tableName,
- 'hoodie.datasource.write.operation': 'upsert',
- 'hoodie.datasource.write.precombine.field': 'ts',
- 'hoodie.upsert.shuffle.parallelism': 2,
- 'hoodie.insert.shuffle.parallelism': 2
+# issue deletes
+hudi_hard_delete_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'delete',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
}
-df.write.format("hudi"). \
- options(**hudi_options). \
- mode("overwrite"). \
- save(basePath)
+from pyspark.sql.functions import lit
+deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
+hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
+hard_delete_df.write.format("hudi"). \
+ options(**hudi_hard_delete_options). \
+ mode("append"). \
+ save(basePath)
-# read stream to streaming df
-df = spark.readStream \
- .format("hudi") \
- .load(basePath)
+# run the same read query as above.
+roAfterDeleteViewDF = spark. \
+ read. \
+ format("hudi"). \
+ load(basePath)
+```
-# ead stream and output results to console
-spark.readStream \
- .format("hudi") \
- .load(basePath) \
- .writeStream \
- .format("console") \
- .start()
+```python
+roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")
+# fetch should return (total - 2) records
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
```
</TabItem>
</Tabs
>
-### Streaming Write
+## Advanced Querying capabilities
+
+### Time Travel Query
+
+Hudi supports time travel query since 0.9.0. Currently three query time
formats are supported as given below.
<Tabs
groupId="programming-language"
-defaultValue="python"
+defaultValue="scala"
values={[
{ label: 'Scala', value: 'scala', },
{ label: 'Python', value: 'python', },
+{ label: 'Spark SQL', value: 'sparksql', },
]}
>
<TabItem value="scala">
```scala
-// spark-shell
-// prepare to stream write to new table
-import org.apache.spark.sql.streaming.Trigger
-
-val streamingTableName = "hudi_trips_cow_streaming"
-val baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
-val checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
+spark.read.
+ format("hudi").
+ option("as.of.instant", "20210728141108100").
+ load(basePath)
-// create streaming df
-val df = spark.readStream.
- format("hudi").
- load(basePath)
+spark.read.
+ format("hudi").
+ option("as.of.instant", "2021-07-28 14:11:08.200").
+ load(basePath)
-// write stream to new hudi table
-df.writeStream.format("hudi").
- options(getQuickstartWriteConfigs).
- option(PRECOMBINE_FIELD_OPT_KEY, "ts").
- option(RECORDKEY_FIELD_OPT_KEY, "uuid").
- option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
- option(TABLE_NAME, streamingTableName).
- outputMode("append").
- option("path", baseStreamingPath).
- option("checkpointLocation", checkpointLocation).
- trigger(Trigger.Once()).
- start()
+// It is equal to "as.of.instant = 2021-07-28 00:00:00"
+spark.read.
+ format("hudi").
+ option("as.of.instant", "2021-07-28").
+ load(basePath)
```
</TabItem>
<TabItem value="python">
```python
-# pyspark
-# prepare to stream write to new table
-streamingTableName = "hudi_trips_cow_streaming"
-baseStreamingPath = "file:///tmp/hudi_trips_cow_streaming"
-checkpointLocation = "file:///tmp/checkpoints/hudi_trips_cow_streaming"
-
-hudi_streaming_options = {
- 'hoodie.table.name': streamingTableName,
- 'hoodie.datasource.write.recordkey.field': 'uuid',
- 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': streamingTableName,
- 'hoodie.datasource.write.operation': 'upsert',
- 'hoodie.datasource.write.precombine.field': 'ts',
- 'hoodie.upsert.shuffle.parallelism': 2,
- 'hoodie.insert.shuffle.parallelism': 2
-}
-
-# create streaming df
-df = spark.readStream \
- .format("hudi") \
- .load(basePath)
+#pyspark
+spark.read. \
+ format("hudi"). \
+ option("as.of.instant", "20210728141108"). \
+ load(basePath)
-# write stream to new hudi table
-df.writeStream.format("hudi") \
- .options(**hudi_streaming_options) \
- .outputMode("append") \
- .option("path", baseStreamingPath) \
- .option("checkpointLocation", checkpointLocation) \
- .trigger(once=True) \
- .start()
+spark.read. \
+ format("hudi"). \
+ option("as.of.instant", "2021-07-28 14:11:08.000"). \
+ load(basePath)
+# It is equal to "as.of.instant = 2021-07-28 00:00:00"
+spark.read. \
+ format("hudi"). \
+ option("as.of.instant", "2021-07-28"). \
+ load(basePath)
```
</TabItem>
-
-</Tabs
->
-
-:::info
-Spark SQL can be used within ForeachBatch sink to do INSERT, UPDATE, DELETE
and MERGE INTO.
-Target table must exist before write.
-:::
-
-### Table maintenance
-Hudi can run async or inline table services while running Strucrured Streaming
query and takes care of cleaning, compaction and clustering. There's no
operational overhead for the user.
-For CoW tables, table services work in inline mode by default.
-For MoR tables, some async services are enabled by default.
+<TabItem value="sparksql">
:::note
-Since Hudi 0.11 Metadata Table is enabled by default. When using async table
services with Metadata Table enabled you must use Optimistic Concurrency
Control to avoid the risk of data loss (even in single writer scenario). See
[Metadata Table deployment
considerations](/docs/next/metadata#deployment-considerations) for detailed
instructions.
-
-If you're using Foreach or ForeachBatch streaming sink you must use inline
table services, async table services are not supported.
+Requires Spark 3.2+
:::
-Hive Sync works with Structured Streaming, it will create table if not exists
and synchronize table to metastore aftear each streaming write.
-
-## Point in time query
-
-Lets look at how to query data as of a specific time. The specific time can be
represented by pointing endTime to a
-specific commit time and beginTime to "000" (denoting earliest possible commit
time).
-
-<Tabs
-groupId="programming-language"
-defaultValue="python"
-values={[
-{ label: 'Scala', value: 'scala', },
-{ label: 'Python', value: 'python', },
-]}
->
-
-<TabItem value="scala">
-
-```scala
-// spark-shell
-val beginTime = "000" // Represents all commits > this time.
-val endTime = commits(commits.length - 2) // commit time we are interested in
-
-//incrementally query data
-val tripsPointInTimeDF = spark.read.format("hudi").
- option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
- option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
- option(END_INSTANTTIME_OPT_KEY, endTime).
- load(basePath)
-tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
-spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_point_in_time where fare > 20.0").show()
-```
-
-</TabItem>
-<TabItem value="python">
-
-```python
-# pyspark
-beginTime = "000" # Represents all commits > this time.
-endTime = commits[len(commits) - 2]
+```sql
+create table hudi_cow_pt_tbl (
+ id bigint,
+ name string,
+ ts bigint,
+ dt string,
+ hh string
+) using hudi
+tblproperties (
+ type = 'cow',
+ primaryKey = 'id',
+ preCombineField = 'ts'
+ )
+partitioned by (dt, hh)
+location '/tmp/hudi/hudi_cow_pt_tbl';
-# query point in time data
-point_in_time_read_options = {
- 'hoodie.datasource.query.type': 'incremental',
- 'hoodie.datasource.read.end.instanttime': endTime,
- 'hoodie.datasource.read.begin.instanttime': beginTime
-}
+insert into hudi_cow_pt_tbl select 1, 'a0', 1000, '2021-12-09', '10';
+select * from hudi_cow_pt_tbl;
-tripsPointInTimeDF = spark.read.format("hudi"). \
- options(**point_in_time_read_options). \
- load(basePath)
+-- record id=1 changes `name`
+insert into hudi_cow_pt_tbl select 1, 'a1', 1001, '2021-12-09', '10';
+select * from hudi_cow_pt_tbl;
-tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
-spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_point_in_time where fare > 20.0").show()
+-- time travel based on first commit time, assume `20220307091628793`
+select * from hudi_cow_pt_tbl timestamp as of '20220307091628793' where id = 1;
+-- time travel based on different timestamp formats
+select * from hudi_cow_pt_tbl timestamp as of '2022-03-07 09:16:28.100' where
id = 1;
+select * from hudi_cow_pt_tbl timestamp as of '2022-03-08' where id = 1;
```
</TabItem>
</Tabs
>
-## Delete data {#deletes}
-
-Apache Hudi supports two types of deletes: <br/>
-1. **Soft Deletes**: This retains the record key and just nulls out the
values for all the other fields. The records with nulls in soft deletes are
always persisted in storage and never removed.
-2. **Hard Deletes**: This physically removes any trace of the record from the
table. Check out the
-[deletion section](/docs/writing_data#deletes) for more details.
-### Soft Deletes
+### Incremental query
-Soft deletes retain the record key and null out the values for all the other
fields. For example, records with nulls in soft deletes are always persisted in
storage and never removed.<br/><br/>
+Hudi also provides capability to obtain a stream of records that changed since
given commit timestamp.
+This can be achieved using Hudi's incremental querying and providing a begin
time from which changes need to be streamed.
+We do not need to specify endTime, if we want all changes after the given
commit (as is the common case).
<Tabs
groupId="programming-language"
-defaultValue="python"
+defaultValue="scala"
values={[
{ label: 'Scala', value: 'scala', },
-{ label: 'Python', value: 'python', }
+{ label: 'Python', value: 'python', },
+{ label: 'Spark SQL', value: 'sparksql', }
]}
>
<TabItem value="scala">
```scala
// spark-shell
-spark.
- read.
- format("hudi").
- load(basePath).
- createOrReplaceTempView("hudi_trips_snapshot")
-// fetch total records count
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
-// fetch two records for soft deletes
-val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
-
-// prepare the soft deletes by ensuring the appropriate fields are nullified
-val nullifyColumns = softDeleteDs.schema.fields.
- map(field => (field.name, field.dataType.typeName)).
- filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
- && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
-
-val softDeleteDf = nullifyColumns.
- foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
- (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
-
-// simply upsert the table after setting these fields to null
-softDeleteDf.write.format("hudi").
- options(getQuickstartWriteConfigs).
- option(OPERATION_OPT_KEY, "upsert").
- option(PRECOMBINE_FIELD_OPT_KEY, "ts").
- option(RECORDKEY_FIELD_OPT_KEY, "uuid").
- option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
- option(TABLE_NAME, tableName).
- mode(Append).
- save(basePath)
-
// reload data
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
-// This should return the same total count as before
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
-// This should return (total - 2) count as two records are updated with nulls
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime
from hudi_trips_snapshot order by commitTime").map(k =>
k.getString(0)).take(50)
+val beginTime = commits(commits.length - 2) // commit time we are interested in
+
+// incrementally query data
+val tripsIncrementalDF = spark.read.format("hudi").
+ option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
+ option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
+ load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
```
-:::note
-Notice that the save mode is `Append`.
-:::
+
</TabItem>
<TabItem value="python">
-:::note
-Notice that the save mode is `Append`.
-:::
-
```python
# pyspark
-from pyspark.sql.functions import lit
-from functools import reduce
-
-spark.read.format("hudi"). \
+# reload data
+spark. \
+ read. \
+ format("hudi"). \
load(basePath). \
createOrReplaceTempView("hudi_trips_snapshot")
-# fetch total records count
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
-
-# fetch two records for soft deletes
-soft_delete_ds = spark.sql("select * from hudi_trips_snapshot").limit(2)
-
-# prepare the soft deletes by ensuring the appropriate fields are nullified
-meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", \
- "_hoodie_partition_path", "_hoodie_file_name"]
-excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
-```
-```python
-nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
- list(map(lambda field: (field.name, field.dataType),
soft_delete_ds.schema.fields))))
+commits = list(map(lambda row: row[0], spark.sql("select
distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by
commitTime").limit(50).collect()))
+beginTime = commits[len(commits) - 2] # commit time we are interested in
-hudi_soft_delete_options = {
- 'hoodie.table.name': tableName,
- 'hoodie.datasource.write.recordkey.field': 'uuid',
- 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': tableName,
- 'hoodie.datasource.write.operation': 'upsert',
- 'hoodie.datasource.write.precombine.field': 'ts',
- 'hoodie.upsert.shuffle.parallelism': 2,
- 'hoodie.insert.shuffle.parallelism': 2
+# incrementally query data
+incremental_read_options = {
+ 'hoodie.datasource.query.type': 'incremental',
+ 'hoodie.datasource.read.begin.instanttime': beginTime,
}
-soft_delete_df = reduce(lambda df,col: df.withColumn(col[0],
lit(None).cast(col[1])), \
- nullify_columns, reduce(lambda df,col: df.drop(col[0]), meta_columns,
soft_delete_ds))
-
-# simply upsert the table after setting these fields to null
-soft_delete_df.write.format("hudi"). \
- options(**hudi_soft_delete_options). \
- mode("append"). \
- save(basePath)
-
-# reload data
-spark.read.format("hudi"). \
- load(basePath). \
- createOrReplaceTempView("hudi_trips_snapshot")
+tripsIncrementalDF = spark.read.format("hudi"). \
+ options(**incremental_read_options). \
+ load(basePath)
+tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
-# This should return the same total count as before
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
-# This should return (total - 2) count as two records are updated with nulls
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
```
</TabItem>
-</Tabs
->
-
-### Hard Deletes
-Hard deletes physically remove any trace of the record from the table. For
example, this deletes records for the HoodieKeys passed in.<br/><br/>
-
-<Tabs
-groupId="programming-language"
-defaultValue="python"
-values={[
-{ label: 'Scala', value: 'scala', },
-{ label: 'Python', value: 'python', },
-{ label: 'Spark SQL', value: 'sparksql', },
-]}
->
-
-<TabItem value="scala">
-Delete records for the HoodieKeys passed in.<br/>
-
-```scala
-// spark-shell
-// fetch total records count
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
-// fetch two records to be deleted
-val ds = spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").limit(2)
-
-// issue deletes
-val deletes = dataGen.generateDeletes(ds.collectAsList())
-val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
-
-hardDeleteDf.write.format("hudi").
- options(getQuickstartWriteConfigs).
- option(OPERATION_OPT_KEY, "delete").
- option(PRECOMBINE_FIELD_OPT_KEY, "ts").
- option(RECORDKEY_FIELD_OPT_KEY, "uuid").
- option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
- option(TABLE_NAME, tableName).
- mode(Append).
- save(basePath)
-
-// run the same read query as above.
-val roAfterDeleteViewDF = spark.
- read.
- format("hudi").
- load(basePath)
-
-roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
-// fetch should return (total - 2) records
-spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
-```
-:::note
-Only `Append` mode is supported for delete operation.
-:::
-</TabItem>
<TabItem value="sparksql">
-**Syntax**
```sql
Review Comment:
@nsivabalan to ensure this is consistent with scala and pyspark tabs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]