bhasudha commented on code in PR #9622:
URL: https://github.com/apache/hudi/pull/9622#discussion_r1324514022
##########
website/docs/quick-start-guide.md:
##########
@@ -827,633 +789,348 @@ denoted by the timestamp. Look for changes in
`_hoodie_commit_time`, `rider`, `d
>
-## Incremental query
+## Delete data {#deletes}
-Hudi also provides capability to obtain a stream of records that changed since
given commit timestamp.
-This can be achieved using Hudi's incremental querying and providing a begin
time from which changes need to be streamed.
-We do not need to specify endTime, if we want all changes after the given
commit (as is the common case).
+Hard deletes physically remove any trace of the record from the table. For
example, this deletes records for the HoodieKeys passed in.
+Check out the [deletion section](/docs/writing_data#deletes) for more details.
+<br/><br/>
<Tabs
groupId="programming-language"
-defaultValue="python"
+defaultValue="scala"
values={[
{ label: 'Scala', value: 'scala', },
{ label: 'Python', value: 'python', },
-{ label: 'Spark SQL', value: 'sparksql', }
+{ label: 'Spark SQL', value: 'sparksql', },
]}
>
<TabItem value="scala">
+Delete records for the HoodieKeys passed in.<br/>
```scala
// spark-shell
-// reload data
-spark.
+// fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+// fetch two records to be deleted
+val ds = spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").limit(2)
+
+// issue deletes
+val deletes = dataGen.generateDeletes(ds.collectAsList())
+val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
+
+hardDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "delete").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+
+// run the same read query as above.
+val roAfterDeleteViewDF = spark.
read.
format("hudi").
- load(basePath).
- createOrReplaceTempView("hudi_trips_snapshot")
+ load(basePath)
-val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime
from hudi_trips_snapshot order by commitTime").map(k =>
k.getString(0)).take(50)
-val beginTime = commits(commits.length - 2) // commit time we are interested in
+roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
+// fetch should return (total - 2) records
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+```
+:::note
+Only `Append` mode is supported for delete operation.
+:::
+</TabItem>
+<TabItem value="sparksql">
-// incrementally query data
-val tripsIncrementalDF = spark.read.format("hudi").
- option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
- option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
- load(basePath)
-tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
+**Syntax**
+```sql
+DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
+```
+**Example**
+```sql
+delete from hudi_cow_nonpcf_tbl where uuid = 1;
-spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
+delete from hudi_mor_tbl where id % 2 = 0;
+
+-- delete using non-PK field
+delete from hudi_cow_pt_tbl where name = 'a1';
```
</TabItem>
+
<TabItem value="python">
-```python
-# pyspark
-# reload data
-spark. \
- read. \
- format("hudi"). \
- load(basePath). \
- createOrReplaceTempView("hudi_trips_snapshot")
-
-commits = list(map(lambda row: row[0], spark.sql("select
distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by
commitTime").limit(50).collect()))
-beginTime = commits[len(commits) - 2] # commit time we are interested in
-
-# incrementally query data
-incremental_read_options = {
- 'hoodie.datasource.query.type': 'incremental',
- 'hoodie.datasource.read.begin.instanttime': beginTime,
-}
-
-tripsIncrementalDF = spark.read.format("hudi"). \
- options(**incremental_read_options). \
- load(basePath)
-tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
-
-spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from
hudi_trips_incremental where fare > 20.0").show()
-```
-
-</TabItem>
-
-<TabItem value="sparksql">
-
-```sql
--- syntax
-hudi_table_changes(table or path, queryType, beginTime [, endTime]);
--- table or path: table identifier, example: db.tableName, tableName,
--- or path for of your table, example: path/to/hudiTable
--- in this case table does not need to exist in the metastore,
--- queryType: incremental query mode, example: latest_state, cdc
--- (for cdc query, first enable cdc for your table by setting
cdc.enabled=true),
--- beginTime: instantTime to begin query from, example: earliest,
202305150000,
--- endTime: optional instantTime to end query at, example: 202305160000,
-
--- incrementally query data by table name
--- start from earliest available commit, end at latest available commit.
-select * from hudi_table_changes('db.table', 'latest_state', 'earliest');
-
--- start from earliest, end at 202305160000.
-select * from hudi_table_changes('table', 'latest_state', 'earliest',
'202305160000');
-
--- start from 202305150000, end at 202305160000.
-select * from hudi_table_changes('table', 'latest_state', '202305150000',
'202305160000');
-
--- incrementally query data by path
--- start from earliest available commit, end at latest available commit.
-select * from hudi_table_changes('path/to/table', 'cdc', 'earliest');
-
--- start from earliest, end at 202305160000.
-select * from hudi_table_changes('path/to/table', 'cdc', 'earliest',
'202305160000');
-
--- start from 202305150000, end at 202305160000.
-select * from hudi_table_changes('path/to/table', 'cdc', '202305150000',
'202305160000');
-```
-
-</TabItem>
-
-</Tabs>
-
-:::info
-This will give all changes that happened after the beginTime commit with the
filter of fare > 20.0. The unique thing about this
-feature is that it now lets you author streaming pipelines on batch data.
+:::note
+Only `Append` mode is supported for delete operation.
:::
-## Structured Streaming
-
-Hudi supports Spark Structured Streaming reads and writes.
-Structured Streaming reads are based on Hudi Incremental Query feature,
therefore streaming read can return data for which commits and base files were
not yet removed by the cleaner. You can control commits retention time.
-
-### Streaming Read
-
-<Tabs
-groupId="programming-language"
-defaultValue="python"
-values={[
-{ label: 'Scala', value: 'scala', },
-{ label: 'Python', value: 'python', },
-]}
->
-
-<TabItem value="scala">
-
-```scala
-// spark-shell
-// reload data
-df.write.format("hudi").
- options(getQuickstartWriteConfigs).
- option(PRECOMBINE_FIELD_OPT_KEY, "ts").
- option(RECORDKEY_FIELD_OPT_KEY, "uuid").
- option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
- option(TABLE_NAME, tableName).
- mode(Overwrite).
- save(basePath)
-
-// read stream and output results to console
-spark.readStream.
- format("hudi").
- load(basePath).
- writeStream.
- format("console").
- start()
-
-// read stream to streaming df
-val df = spark.readStream.
- format("hudi").
- load(basePath)
-
-```
-
-</TabItem>
-<TabItem value="python">
-
```python
# pyspark
-# reload data
-inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
- dataGen.generateInserts(10))
-df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+# fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+# fetch two records to be deleted
+ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
-hudi_options = {
- 'hoodie.table.name': tableName,
- 'hoodie.datasource.write.recordkey.field': 'uuid',
- 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': tableName,
- 'hoodie.datasource.write.operation': 'upsert',
- 'hoodie.datasource.write.precombine.field': 'ts',
- 'hoodie.upsert.shuffle.parallelism': 2,
- 'hoodie.insert.shuffle.parallelism': 2
+# issue deletes
+hudi_hard_delete_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'delete',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
}
-df.write.format("hudi"). \
- options(**hudi_options). \
- mode("overwrite"). \
- save(basePath)
+from pyspark.sql.functions import lit
+deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
+hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
+hard_delete_df.write.format("hudi"). \
+ options(**hudi_hard_delete_options). \
+ mode("append"). \
+ save(basePath)
-# read stream to streaming df
-df = spark.readStream \
- .format("hudi") \
- .load(basePath)
+# run the same read query as above.
+roAfterDeleteViewDF = spark. \
+ read. \
+ format("hudi"). \
+ load(basePath)
+```
-# ead stream and output results to console
-spark.readStream \
- .format("hudi") \
- .load(basePath) \
- .writeStream \
- .format("console") \
- .start()
+```python
+roAfterDeleteViewDF.createOrReplaceTempView("hudi_trips_snapshot")
+# fetch should return (total - 2) records
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
```
</TabItem>
</Tabs
>
-### Streaming Write
+## Advanced Querying capabilities
Review Comment:
Like this way of categorization
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]