This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 3b3b034cdd [HUDI-4748][DOCS] Add examples of soft deletes in docs
(#6547)
3b3b034cdd is described below
commit 3b3b034cdd7a344f1cb73d62d786192906f7e24e
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Aug 30 16:14:22 2022 -0700
[HUDI-4748][DOCS] Add examples of soft deletes in docs (#6547)
---
website/docs/quick-start-guide.md | 147 +++++++++++++++++++--
website/docs/writing_data.md | 30 +++++
.../version-0.10.0/quick-start-guide.md | 147 +++++++++++++++++++--
.../versioned_docs/version-0.10.0/writing_data.md | 30 +++++
.../version-0.10.1/quick-start-guide.md | 147 +++++++++++++++++++--
.../versioned_docs/version-0.10.1/writing_data.md | 30 +++++
.../version-0.11.0/quick-start-guide.md | 147 +++++++++++++++++++--
.../versioned_docs/version-0.11.0/writing_data.md | 30 +++++
.../version-0.11.1/quick-start-guide.md | 147 +++++++++++++++++++--
.../versioned_docs/version-0.11.1/writing_data.md | 30 +++++
.../version-0.12.0/quick-start-guide.md | 147 +++++++++++++++++++--
.../versioned_docs/version-0.12.0/writing_data.md | 30 +++++
12 files changed, 1008 insertions(+), 54 deletions(-)
diff --git a/website/docs/quick-start-guide.md
b/website/docs/quick-start-guide.md
index eb1f3596d2..5e6e4e6810 100644
--- a/website/docs/quick-start-guide.md
+++ b/website/docs/quick-start-guide.md
@@ -184,6 +184,7 @@ import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
@@ -958,6 +959,137 @@ spark.sql("select `_hoodie_commit_time`, fare, begin_lon,
begin_lat, ts from hud
## Delete data {#deletes}
+Apache Hudi supports two types of deletes: (1) **Soft Deletes**: retaining the
record key and just nulling out the values
+for all the other fields (records with nulls in soft deletes are always
persisted in storage and never removed);
+(2) **Hard Deletes**: physically removing any trace of the record from the
table. See the
+[deletion section](/docs/writing_data#deletes) of the writing data page for
more details.
+
+### Soft Deletes
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', }
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+// fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+
+// reload data
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+// This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+// This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+from pyspark.sql.functions import lit
+from functools import reduce
+
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+# fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+# fetch two records for soft deletes
+soft_delete_ds = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+# prepare the soft deletes by ensuring the appropriate fields are nullified
+meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", \
+ "_hoodie_partition_path", "_hoodie_file_name"]
+excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
+nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
+ list(map(lambda field: (field.name, field.dataType),
softDeleteDs.schema.fields))))
+
+hudi_soft_delete_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'upsert',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+soft_delete_df = reduce(lambda df,col: df.withColumn(col[0],
lit(None).cast(col[1])), \
+ nullify_columns, reduce(lambda df,col: df.drop(col[0]), meta_columns,
soft_delete_ds))
+
+# simply upsert the table after setting these fields to null
+soft_delete_df.write.format("hudi"). \
+ options(**hudi_soft_delete_options). \
+ mode("append"). \
+ save(basePath)
+
+# reload data
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+# This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+# This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+
+</Tabs
+>
+
+
+### Hard Deletes
+
<Tabs
defaultValue="scala"
values={[
@@ -979,9 +1111,9 @@ val ds = spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").limit(
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
-val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
+val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
-df.write.format("hudi").
+hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
@@ -1033,7 +1165,7 @@ spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").count()
ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
# issue deletes
-hudi_delete_options = {
+hudi_hard_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
@@ -1046,9 +1178,9 @@ hudi_delete_options = {
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
-df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
-df.write.format("hudi"). \
- options(**hudi_delete_options). \
+hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
+hard_delete_df.write.format("hudi"). \
+ options(**hudi_hard_delete_options). \
mode("append"). \
save(basePath)
@@ -1069,9 +1201,6 @@ Only `Append` mode is supported for delete operation.
</Tabs
>
-
-See the [deletion section](/docs/writing_data#deletes) of the writing data
page for more details.
-
## Insert Overwrite
Generate some new trips, overwrite the all the partitions that are present in
the input. This operation can be faster
diff --git a/website/docs/writing_data.md b/website/docs/writing_data.md
index 27ef443d95..51679a6cc8 100644
--- a/website/docs/writing_data.md
+++ b/website/docs/writing_data.md
@@ -296,6 +296,36 @@ For more info refer to [Delete support in
Hudi](https://cwiki.apache.org/conflue
- **Soft Deletes** : Retain the record key and just null out the values for
all the other fields.
This can be achieved by ensuring the appropriate fields are nullable in the
table schema and simply upserting the table after setting these fields to null.
+ Note that soft deletes are always persisted in storage and never removed,
but all values are set to nulls.
+ So for GDPR or other compliance reasons, users should consider doing hard
deletes if record key and partition path
+ contain PII.
+
+For example:
+```scala
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+```
- **Hard Deletes** : A stronger form of deletion is to physically remove any
trace of the record from the table. This can be achieved in 3 different ways.
diff --git a/website/versioned_docs/version-0.10.0/quick-start-guide.md
b/website/versioned_docs/version-0.10.0/quick-start-guide.md
index 108b1071cd..6847e20a79 100644
--- a/website/versioned_docs/version-0.10.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.10.0/quick-start-guide.md
@@ -139,6 +139,7 @@ import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
@@ -860,6 +861,137 @@ spark.sql("select `_hoodie_commit_time`, fare, begin_lon,
begin_lat, ts from hud
## Delete data {#deletes}
+Apache Hudi supports two types of deletes: (1) **Soft Deletes**: retaining the
record key and just nulling out the values
+for all the other fields (records with nulls in soft deletes are always
persisted in storage and never removed);
+(2) **Hard Deletes**: physically removing any trace of the record from the
table. See the
+[deletion section](/docs/writing_data#deletes) of the writing data page for
more details.
+
+### Soft Deletes
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', }
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+// fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+
+// reload data
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+// This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+// This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+from pyspark.sql.functions import lit
+from functools import reduce
+
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+# fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+# fetch two records for soft deletes
+soft_delete_ds = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+# prepare the soft deletes by ensuring the appropriate fields are nullified
+meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", \
+ "_hoodie_partition_path", "_hoodie_file_name"]
+excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
+nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
+ list(map(lambda field: (field.name, field.dataType),
softDeleteDs.schema.fields))))
+
+hudi_soft_delete_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'upsert',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+soft_delete_df = reduce(lambda df,col: df.withColumn(col[0],
lit(None).cast(col[1])), \
+ nullify_columns, reduce(lambda df,col: df.drop(col[0]), meta_columns,
soft_delete_ds))
+
+# simply upsert the table after setting these fields to null
+soft_delete_df.write.format("hudi"). \
+ options(**hudi_soft_delete_options). \
+ mode("append"). \
+ save(basePath)
+
+# reload data
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+# This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+# This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+
+</Tabs
+>
+
+
+### Hard Deletes
+
<Tabs
defaultValue="scala"
values={[
@@ -879,9 +1011,9 @@ val ds = spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").limit(
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
-val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
+val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
-df.write.format("hudi").
+hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
@@ -930,7 +1062,7 @@ spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").count()
ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
# issue deletes
-hudi_delete_options = {
+hudi_hard_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
@@ -943,9 +1075,9 @@ hudi_delete_options = {
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
-df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
-df.write.format("hudi"). \
- options(**hudi_delete_options). \
+hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
+hard_delete_df.write.format("hudi"). \
+ options(**hudi_hard_delete_options). \
mode("append"). \
save(basePath)
@@ -964,9 +1096,6 @@ Only `Append` mode is supported for delete operation.
</TabItem>
</Tabs>
-
-See the [deletion section](/docs/writing_data#deletes) of the writing data
page for more details.
-
## Insert Overwrite
Generate some new trips, overwrite the all the partitions that are present in
the input. This operation can be faster
diff --git a/website/versioned_docs/version-0.10.0/writing_data.md
b/website/versioned_docs/version-0.10.0/writing_data.md
index 15fcc4d66b..24cc825265 100644
--- a/website/versioned_docs/version-0.10.0/writing_data.md
+++ b/website/versioned_docs/version-0.10.0/writing_data.md
@@ -296,6 +296,36 @@ For more info refer to [Delete support in
Hudi](https://cwiki.apache.org/conflue
- **Soft Deletes** : Retain the record key and just null out the values for
all the other fields.
This can be achieved by ensuring the appropriate fields are nullable in the
table schema and simply upserting the table after setting these fields to null.
+ Note that soft deletes are always persisted in storage and never removed,
but all values are set to nulls.
+ So for GDPR or other compliance reasons, users should consider doing hard
deletes if record key and partition path
+ contain PII.
+
+For example:
+```scala
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+```
- **Hard Deletes** : A stronger form of deletion is to physically remove any
trace of the record from the table. This can be achieved in 3 different ways.
diff --git a/website/versioned_docs/version-0.10.1/quick-start-guide.md
b/website/versioned_docs/version-0.10.1/quick-start-guide.md
index 42ce44162f..b65d786838 100644
--- a/website/versioned_docs/version-0.10.1/quick-start-guide.md
+++ b/website/versioned_docs/version-0.10.1/quick-start-guide.md
@@ -156,6 +156,7 @@ import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
@@ -877,6 +878,137 @@ spark.sql("select `_hoodie_commit_time`, fare, begin_lon,
begin_lat, ts from hud
## Delete data {#deletes}
+Apache Hudi supports two types of deletes: (1) **Soft Deletes**: retaining the
record key and just nulling out the values
+for all the other fields (records with nulls in soft deletes are always
persisted in storage and never removed);
+(2) **Hard Deletes**: physically removing any trace of the record from the
table. See the
+[deletion section](/docs/writing_data#deletes) of the writing data page for
more details.
+
+### Soft Deletes
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', }
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+// fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+
+// reload data
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+// This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+// This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+from pyspark.sql.functions import lit
+from functools import reduce
+
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+# fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+# fetch two records for soft deletes
+soft_delete_ds = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+# prepare the soft deletes by ensuring the appropriate fields are nullified
+meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", \
+ "_hoodie_partition_path", "_hoodie_file_name"]
+excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
+nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
+ list(map(lambda field: (field.name, field.dataType),
softDeleteDs.schema.fields))))
+
+hudi_soft_delete_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'upsert',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+soft_delete_df = reduce(lambda df,col: df.withColumn(col[0],
lit(None).cast(col[1])), \
+ nullify_columns, reduce(lambda df,col: df.drop(col[0]), meta_columns,
soft_delete_ds))
+
+# simply upsert the table after setting these fields to null
+soft_delete_df.write.format("hudi"). \
+ options(**hudi_soft_delete_options). \
+ mode("append"). \
+ save(basePath)
+
+# reload data
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+# This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+# This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+
+</Tabs
+>
+
+
+### Hard Deletes
+
<Tabs
defaultValue="scala"
values={[
@@ -896,9 +1028,9 @@ val ds = spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").limit(
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
-val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
+val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
-df.write.format("hudi").
+hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
@@ -947,7 +1079,7 @@ spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").count()
ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
# issue deletes
-hudi_delete_options = {
+hudi_hard_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
@@ -960,9 +1092,9 @@ hudi_delete_options = {
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
-df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
-df.write.format("hudi"). \
- options(**hudi_delete_options). \
+hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
+hard_delete_df.write.format("hudi"). \
+ options(**hudi_hard_delete_options). \
mode("append"). \
save(basePath)
@@ -981,9 +1113,6 @@ Only `Append` mode is supported for delete operation.
</TabItem>
</Tabs>
-
-See the [deletion section](/docs/writing_data#deletes) of the writing data
page for more details.
-
## Insert Overwrite
Generate some new trips, overwrite the all the partitions that are present in
the input. This operation can be faster
diff --git a/website/versioned_docs/version-0.10.1/writing_data.md
b/website/versioned_docs/version-0.10.1/writing_data.md
index 15fcc4d66b..24cc825265 100644
--- a/website/versioned_docs/version-0.10.1/writing_data.md
+++ b/website/versioned_docs/version-0.10.1/writing_data.md
@@ -296,6 +296,36 @@ For more info refer to [Delete support in
Hudi](https://cwiki.apache.org/conflue
- **Soft Deletes** : Retain the record key and just null out the values for
all the other fields.
This can be achieved by ensuring the appropriate fields are nullable in the
table schema and simply upserting the table after setting these fields to null.
+ Note that soft deletes are always persisted in storage and never removed,
but all values are set to nulls.
+ So for GDPR or other compliance reasons, users should consider doing hard
deletes if record key and partition path
+ contain PII.
+
+For example:
+```scala
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+```
- **Hard Deletes** : A stronger form of deletion is to physically remove any
trace of the record from the table. This can be achieved in 3 different ways.
diff --git a/website/versioned_docs/version-0.11.0/quick-start-guide.md
b/website/versioned_docs/version-0.11.0/quick-start-guide.md
index 14166b6fe5..c2d840aae9 100644
--- a/website/versioned_docs/version-0.11.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.11.0/quick-start-guide.md
@@ -158,6 +158,7 @@ import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
@@ -925,6 +926,137 @@ spark.sql("select `_hoodie_commit_time`, fare, begin_lon,
begin_lat, ts from hud
## Delete data {#deletes}
+Apache Hudi supports two types of deletes: (1) **Soft Deletes**: retaining the
record key and just nulling out the values
+for all the other fields (records with nulls in soft deletes are always
persisted in storage and never removed);
+(2) **Hard Deletes**: physically removing any trace of the record from the
table. See the
+[deletion section](/docs/writing_data#deletes) of the writing data page for
more details.
+
+### Soft Deletes
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', }
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+// fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+
+// reload data
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+// This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+// This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+from pyspark.sql.functions import lit
+from functools import reduce
+
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+# fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+# fetch two records for soft deletes
+soft_delete_ds = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+# prepare the soft deletes by ensuring the appropriate fields are nullified
+meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", \
+ "_hoodie_partition_path", "_hoodie_file_name"]
+excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
+nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
+ list(map(lambda field: (field.name, field.dataType),
softDeleteDs.schema.fields))))
+
+hudi_soft_delete_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'upsert',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+soft_delete_df = reduce(lambda df,col: df.withColumn(col[0],
lit(None).cast(col[1])), \
+ nullify_columns, reduce(lambda df,col: df.drop(col[0]), meta_columns,
soft_delete_ds))
+
+# simply upsert the table after setting these fields to null
+soft_delete_df.write.format("hudi"). \
+ options(**hudi_soft_delete_options). \
+ mode("append"). \
+ save(basePath)
+
+# reload data
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+# This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+# This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+
+</Tabs
+>
+
+
+### Hard Deletes
+
<Tabs
defaultValue="scala"
values={[
@@ -946,9 +1078,9 @@ val ds = spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").limit(
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
-val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
+val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
-df.write.format("hudi").
+hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
@@ -1000,7 +1132,7 @@ spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").count()
ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
# issue deletes
-hudi_delete_options = {
+hudi_hard_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
@@ -1013,9 +1145,9 @@ hudi_delete_options = {
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
-df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
-df.write.format("hudi"). \
- options(**hudi_delete_options). \
+hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
+hard_delete_df.write.format("hudi"). \
+ options(**hudi_hard_delete_options). \
mode("append"). \
save(basePath)
@@ -1036,9 +1168,6 @@ Only `Append` mode is supported for delete operation.
</Tabs
>
-
-See the [deletion section](/docs/writing_data#deletes) of the writing data
page for more details.
-
## Insert Overwrite
Generate some new trips, overwrite the all the partitions that are present in
the input. This operation can be faster
diff --git a/website/versioned_docs/version-0.11.0/writing_data.md
b/website/versioned_docs/version-0.11.0/writing_data.md
index 3c0a516e2c..792db27971 100644
--- a/website/versioned_docs/version-0.11.0/writing_data.md
+++ b/website/versioned_docs/version-0.11.0/writing_data.md
@@ -296,6 +296,36 @@ For more info refer to [Delete support in
Hudi](https://cwiki.apache.org/conflue
- **Soft Deletes** : Retain the record key and just null out the values for
all the other fields.
This can be achieved by ensuring the appropriate fields are nullable in the
table schema and simply upserting the table after setting these fields to null.
+ Note that soft deletes are always persisted in storage and never removed,
but all values are set to nulls.
+ So for GDPR or other compliance reasons, users should consider doing hard
deletes if record key and partition path
+ contain PII.
+
+For example:
+```scala
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+```
- **Hard Deletes** : A stronger form of deletion is to physically remove any
trace of the record from the table. This can be achieved in 3 different ways.
diff --git a/website/versioned_docs/version-0.11.1/quick-start-guide.md
b/website/versioned_docs/version-0.11.1/quick-start-guide.md
index b9894a51a7..d7d112d457 100644
--- a/website/versioned_docs/version-0.11.1/quick-start-guide.md
+++ b/website/versioned_docs/version-0.11.1/quick-start-guide.md
@@ -158,6 +158,7 @@ import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
@@ -925,6 +926,137 @@ spark.sql("select `_hoodie_commit_time`, fare, begin_lon,
begin_lat, ts from hud
## Delete data {#deletes}
+Apache Hudi supports two types of deletes: (1) **Soft Deletes**: retaining the
record key and just nulling out the values
+for all the other fields (records with nulls in soft deletes are always
persisted in storage and never removed);
+(2) **Hard Deletes**: physically removing any trace of the record from the
table. See the
+[deletion section](/docs/writing_data#deletes) of the writing data page for
more details.
+
+### Soft Deletes
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', }
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+// fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+
+// reload data
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+// This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+// This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+from pyspark.sql.functions import lit
+from functools import reduce
+
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+# fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+# fetch two records for soft deletes
+soft_delete_ds = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+# prepare the soft deletes by ensuring the appropriate fields are nullified
+meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", \
+ "_hoodie_partition_path", "_hoodie_file_name"]
+excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
+nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
+ list(map(lambda field: (field.name, field.dataType),
softDeleteDs.schema.fields))))
+
+hudi_soft_delete_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'upsert',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+soft_delete_df = reduce(lambda df,col: df.withColumn(col[0],
lit(None).cast(col[1])), \
+ nullify_columns, reduce(lambda df,col: df.drop(col[0]), meta_columns,
soft_delete_ds))
+
+# simply upsert the table after setting these fields to null
+soft_delete_df.write.format("hudi"). \
+ options(**hudi_soft_delete_options). \
+ mode("append"). \
+ save(basePath)
+
+# reload data
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+# This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+# This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+
+</Tabs
+>
+
+
+### Hard Deletes
+
<Tabs
defaultValue="scala"
values={[
@@ -946,9 +1078,9 @@ val ds = spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").limit(
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
-val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
+val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
-df.write.format("hudi").
+hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
@@ -1000,7 +1132,7 @@ spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").count()
ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
# issue deletes
-hudi_delete_options = {
+hudi_hard_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
@@ -1013,9 +1145,9 @@ hudi_delete_options = {
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
-df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
-df.write.format("hudi"). \
- options(**hudi_delete_options). \
+hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
+hard_delete_df.write.format("hudi"). \
+ options(**hudi_hard_delete_options). \
mode("append"). \
save(basePath)
@@ -1036,9 +1168,6 @@ Only `Append` mode is supported for delete operation.
</Tabs
>
-
-See the [deletion section](/docs/writing_data#deletes) of the writing data
page for more details.
-
## Insert Overwrite
Generate some new trips, overwrite the all the partitions that are present in
the input. This operation can be faster
diff --git a/website/versioned_docs/version-0.11.1/writing_data.md
b/website/versioned_docs/version-0.11.1/writing_data.md
index 3c0a516e2c..792db27971 100644
--- a/website/versioned_docs/version-0.11.1/writing_data.md
+++ b/website/versioned_docs/version-0.11.1/writing_data.md
@@ -296,6 +296,36 @@ For more info refer to [Delete support in
Hudi](https://cwiki.apache.org/conflue
- **Soft Deletes** : Retain the record key and just null out the values for
all the other fields.
This can be achieved by ensuring the appropriate fields are nullable in the
table schema and simply upserting the table after setting these fields to null.
+ Note that soft deletes are always persisted in storage and never removed,
but all values are set to nulls.
+ So for GDPR or other compliance reasons, users should consider doing hard
deletes if record key and partition path
+ contain PII.
+
+For example:
+```scala
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+```
- **Hard Deletes** : A stronger form of deletion is to physically remove any
trace of the record from the table. This can be achieved in 3 different ways.
diff --git a/website/versioned_docs/version-0.12.0/quick-start-guide.md
b/website/versioned_docs/version-0.12.0/quick-start-guide.md
index 145c17f843..494be9858d 100644
--- a/website/versioned_docs/version-0.12.0/quick-start-guide.md
+++ b/website/versioned_docs/version-0.12.0/quick-start-guide.md
@@ -184,6 +184,7 @@ import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
@@ -958,6 +959,137 @@ spark.sql("select `_hoodie_commit_time`, fare, begin_lon,
begin_lat, ts from hud
## Delete data {#deletes}
+Apache Hudi supports two types of deletes: (1) **Soft Deletes**: retaining the
record key and just nulling out the values
+for all the other fields (records with nulls in soft deletes are always
persisted in storage and never removed);
+(2) **Hard Deletes**: physically removing any trace of the record from the
table. See the
+[deletion section](/docs/writing_data#deletes) of the writing data page for
more details.
+
+### Soft Deletes
+
+<Tabs
+defaultValue="scala"
+values={[
+{ label: 'Scala', value: 'scala', },
+{ label: 'Python', value: 'python', }
+]}
+>
+
+<TabItem value="scala">
+
+```scala
+// spark-shell
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+// fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+
+// reload data
+spark.
+ read.
+ format("hudi").
+ load(basePath).
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+// This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+// This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+<TabItem value="python">
+
+```python
+# pyspark
+from pyspark.sql.functions import lit
+from functools import reduce
+
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+# fetch total records count
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+# fetch two records for soft deletes
+soft_delete_ds = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+# prepare the soft deletes by ensuring the appropriate fields are nullified
+meta_columns = ["_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", \
+ "_hoodie_partition_path", "_hoodie_file_name"]
+excluded_columns = meta_columns + ["ts", "uuid", "partitionpath"]
+nullify_columns = list(filter(lambda field: field[0] not in excluded_columns, \
+ list(map(lambda field: (field.name, field.dataType),
softDeleteDs.schema.fields))))
+
+hudi_soft_delete_options = {
+ 'hoodie.table.name': tableName,
+ 'hoodie.datasource.write.recordkey.field': 'uuid',
+ 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
+ 'hoodie.datasource.write.table.name': tableName,
+ 'hoodie.datasource.write.operation': 'upsert',
+ 'hoodie.datasource.write.precombine.field': 'ts',
+ 'hoodie.upsert.shuffle.parallelism': 2,
+ 'hoodie.insert.shuffle.parallelism': 2
+}
+
+soft_delete_df = reduce(lambda df,col: df.withColumn(col[0],
lit(None).cast(col[1])), \
+ nullify_columns, reduce(lambda df,col: df.drop(col[0]), meta_columns,
soft_delete_ds))
+
+# simply upsert the table after setting these fields to null
+soft_delete_df.write.format("hudi"). \
+ options(**hudi_soft_delete_options). \
+ mode("append"). \
+ save(basePath)
+
+# reload data
+spark.read.format("hudi"). \
+ load(basePath). \
+ createOrReplaceTempView("hudi_trips_snapshot")
+
+# This should return the same total count as before
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
+# This should return (total - 2) count as two records are updated with nulls
+spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is
not null").count()
+```
+:::note
+Notice that the save mode is `Append`.
+:::
+</TabItem>
+
+</Tabs
+>
+
+
+### Hard Deletes
+
<Tabs
defaultValue="scala"
values={[
@@ -979,9 +1111,9 @@ val ds = spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").limit(
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
-val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
+val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
-df.write.format("hudi").
+hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
@@ -1033,7 +1165,7 @@ spark.sql("select uuid, partitionpath from
hudi_trips_snapshot").count()
ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
# issue deletes
-hudi_delete_options = {
+hudi_hard_delete_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
@@ -1046,9 +1178,9 @@ hudi_delete_options = {
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
-df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
-df.write.format("hudi"). \
- options(**hudi_delete_options). \
+hard_delete_df = spark.sparkContext.parallelize(deletes).toDF(['uuid',
'partitionpath']).withColumn('ts', lit(0.0))
+hard_delete_df.write.format("hudi"). \
+ options(**hudi_hard_delete_options). \
mode("append"). \
save(basePath)
@@ -1069,9 +1201,6 @@ Only `Append` mode is supported for delete operation.
</Tabs
>
-
-See the [deletion section](/docs/writing_data#deletes) of the writing data
page for more details.
-
## Insert Overwrite
Generate some new trips, overwrite the all the partitions that are present in
the input. This operation can be faster
diff --git a/website/versioned_docs/version-0.12.0/writing_data.md
b/website/versioned_docs/version-0.12.0/writing_data.md
index 27ef443d95..59ebdcff67 100644
--- a/website/versioned_docs/version-0.12.0/writing_data.md
+++ b/website/versioned_docs/version-0.12.0/writing_data.md
@@ -296,6 +296,36 @@ For more info refer to [Delete support in
Hudi](https://cwiki.apache.org/conflue
- **Soft Deletes** : Retain the record key and just null out the values for
all the other fields.
This can be achieved by ensuring the appropriate fields are nullable in the
table schema and simply upserting the table after setting these fields to null.
+ Note that soft deletes are always persisted in storage and never removed,
but all values are set to nulls.
+ So for GDPR or other compliance reasons, users should consider doing hard
deletes if record key and partition path
+ contain PII.
+
+For example:
+```scala
+// fetch two records for soft deletes
+val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
+
+// prepare the soft deletes by ensuring the appropriate fields are nullified
+val nullifyColumns = softDeleteDs.schema.fields.
+ map(field => (field.name, field.dataType.typeName)).
+ filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
+ && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
+
+val softDeleteDf = nullifyColumns.
+ foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
+ (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
+
+// simply upsert the table after setting these fields to null
+softDeleteDf.write.format("hudi").
+ options(getQuickstartWriteConfigs).
+ option(OPERATION_OPT_KEY, "upsert").
+ option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+ option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+ option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+ option(TABLE_NAME, tableName).
+ mode(Append).
+ save(basePath)
+```
- **Hard Deletes** : A stronger form of deletion is to physically remove any
trace of the record from the table. This can be achieved in 3 different ways.