Ashwanisr opened a new issue #2724:
URL: https://github.com/apache/hudi/issues/2724
spark 2.4.4 runtime
**Upsert function:**
```
def upsert(albumDf: DataFrame, tableName: String, key: String, combineKey:
String) = {
albumDf.write
.format("hudi")
.option(TABLE_TYPE_OPT_KEY, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD_OPT_KEY, key)
.option(PRECOMBINE_FIELD_OPT_KEY, combineKey)
.option(TABLE_NAME, tableName)
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option("hoodie.upsert.shuffle.parallelism", "2")
.option("hoodie.cleaner.policy","KEEP_LATEST_COMMITS")
.option("hoodie.keep.min.commits","3")
.option("hoodie.keep.max.commits","4")
.option("hoodie.cleaner.commits.retained","2")
.option("hoodie.clean.automatic","true")
.mode(Append)
.save(s"$path")
}
```
Using this upsert function I performed 4 upserts. Table state and files are :
hudi/Album/ 0
hudi/Album/.hoodie/ 0
hudi/Album/.hoodie/.aux/ 0
hudi/Album/.hoodie/.aux/.bootstrap/ 0
hudi/Album/.hoodie/.aux/.bootstrap/.fileids/ 0
hudi/Album/.hoodie/.aux/.bootstrap/.partitions/ 0
hudi/Album/.hoodie/.temp/ 0
hudi/Album/.hoodie/20210325102516.commit 1563
hudi/Album/.hoodie/20210325102516.commit.requested 0
hudi/Album/.hoodie/20210325102516.inflight 976
hudi/Album/.hoodie/20210325103000.commit 1573
hudi/Album/.hoodie/20210325103000.commit.requested 0
hudi/Album/.hoodie/20210325103000.inflight 976
hudi/Album/.hoodie/20210325104112.commit 1576
hudi/Album/.hoodie/20210325104112.commit.requested 0
hudi/Album/.hoodie/20210325104112.inflight 1636
hudi/Album/.hoodie/20210325104451.clean 1473
hudi/Album/.hoodie/20210325104451.clean.inflight 1442
hudi/Album/.hoodie/20210325104451.clean.requested 1442
hudi/Album/.hoodie/20210325104451.commit 1576
hudi/Album/.hoodie/20210325104451.commit.requested 0
hudi/Album/.hoodie/20210325104451.inflight 976
hudi/Album/.hoodie/archived/ 0
hudi/Album/.hoodie/hoodie.properties 226
hudi/Album/default/ 0
hudi/Album/default/.hoodie_partition_metadata 93
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-101-491_20210325104112.parquet
434836
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-132-517_20210325104451.parquet
434911
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-53-50_20210325103000.parquet
434781
Table:
_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|albumId| title| tracks|updateDate|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+
| 20210325102516| 20210325102516_0_1| 801|
default|1fc8c6f5-8a27-421...| 801| Hail to the Thief| [2+2=5, Backdrifts]|
18231|
| 20210325102516| 20210325102516_0_2| 800|
default|1fc8c6f5-8a27-421...| 800| 6 String Theory|[Lay it down, Am ...|
18231|
| 20210325103000| 20210325103000_0_1| 802|
default|1fc8c6f5-8a27-421...| 802|Best Of Jazz Blues|[Jumpin' the blue...|
18265|
| 20210325104112| 20210325104112_0_3| 803|
default|1fc8c6f5-8a27-421...| 803| 7|[Lay it down, Am ...|
18231|
| 20210325104451| 20210325104451_0_4| 804|
default|1fc8c6f5-8a27-421...| 804| 7|[Lay it down, Am ...|
18231|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+
Next I perform delete operation using function
```
def delete(albumDf: DataFrame, tableName: String, key: String, combineKey:
String) = {
albumDf.write
.format("hudi")
.option(TABLE_TYPE_OPT_KEY, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD_OPT_KEY, key)
.option(PRECOMBINE_FIELD_OPT_KEY, combineKey)
.option(TABLE_NAME, tableName)
.option(OPERATION_OPT_KEY, "delete")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option("hoodie.cleaner.policy","KEEP_LATEST_COMMITS")
.option("hoodie.keep.min.commits","3")
.option("hoodie.keep.max.commits","4")
.option("hoodie.cleaner.commits.retained","2")
.option("hoodie.clean.automatic","true")
.mode(Append)
.save(s"$path")
}
```
and command as given below:
```
val dataPoint = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, "0").
option(END_INSTANTTIME_OPT_KEY, "20220319000000").
load(path)
// create a view of table to add sql query if required
dataPoint.createOrReplaceTempView("hudi_data_point_in_time")
// The entire row isn’t necessary, we only need keys
val ds = spark.sql("select * from hudi_data_point_in_time where albumId =
800")
ds.show()
```
hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|albumId| title| tracks|updateDate|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+---------------+--------------------+----------+
| 20210325102516| 20210325102516_0_2| 800|
default|1fc8c6f5-8a27-421...| 800|6 String Theory|[Lay it down, Am ...|
18231|
`delete(ds,tableName, "albumId", "updateDate")`
Files after the delete operation are:
hudi/Album/ 0
hudi/Album/.hoodie/ 0
hudi/Album/.hoodie/.aux/ 0
hudi/Album/.hoodie/.aux/.bootstrap/ 0
hudi/Album/.hoodie/.aux/.bootstrap/.fileids/ 0
hudi/Album/.hoodie/.aux/.bootstrap/.partitions/ 0
hudi/Album/.hoodie/.temp/ 0
hudi/Album/.hoodie/20210325104112.commit 1576
hudi/Album/.hoodie/20210325104112.commit.requested 0
hudi/Album/.hoodie/20210325104112.inflight 1636
hudi/Album/.hoodie/20210325104451.clean 1473
hudi/Album/.hoodie/20210325104451.clean.inflight 1442
hudi/Album/.hoodie/20210325104451.clean.requested 1442
hudi/Album/.hoodie/20210325104451.commit 1576
hudi/Album/.hoodie/20210325104451.commit.requested 0
hudi/Album/.hoodie/20210325104451.inflight 976
hudi/Album/.hoodie/20210326054248.clean 1473
hudi/Album/.hoodie/20210326054248.clean.inflight 1442
hudi/Album/.hoodie/20210326054248.clean.requested 1442
hudi/Album/.hoodie/20210326054248.commit 1573
hudi/Album/.hoodie/20210326054248.commit.requested 0
hudi/Album/.hoodie/20210326054248.inflight 1636
hudi/Album/.hoodie/archived/ 0
hudi/Album/.hoodie/archived/.commits_.archive.1_1-0-1 9785
hudi/Album/.hoodie/hoodie.properties 226
hudi/Album/default/ 0
hudi/Album/default/.hoodie_partition_metadata 93
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-101-491_20210325104112.parquet
434836
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-132-517_20210325104451.parquet
434911
hudi/Album/default/1fc8c6f5-8a27-421f-9a1c-f55069522179-0_0-32-22_20210326054248.parquet
434766
and table state is
'_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|albumId| title| tracks|updateDate|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+
| 20210325102516| 20210325102516_0_1| 801|
default|1fc8c6f5-8a27-421...| 801| Hail to the Thief| [2+2=5, Backdrifts]|
18231|
| 20210325103000| 20210325103000_0_1| 802|
default|1fc8c6f5-8a27-421...| 802|Best Of Jazz Blues|[Jumpin' the blue...|
18265|
| 20210325104112| 20210325104112_0_3| 803|
default|1fc8c6f5-8a27-421...| 803| 7|[Lay it down, Am ...|
18231|
| 20210325104451| 20210325104451_0_4| 804|
default|1fc8c6f5-8a27-421...| 804| 7|[Lay it down, Am ...|
18231|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------------+--------------------+----------+
Now I run incremental query from begin time 0 to last upsert timestamp which
should list down row 801,802,803 and 804 but only lists down row 803 and 804
```
spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, "0").
option(END_INSTANTTIME_OPT_KEY, "20210325104452").
load(path).
show()
```
_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|albumId|title| tracks|updateDate|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+--------------------+----------+
| 20210325104112| 20210325104112_0_3| 803|
default|1fc8c6f5-8a27-421...| 803| 7|[Lay it down, Am ...| 18231|
| 20210325104451| 20210325104451_0_4| 804|
default|1fc8c6f5-8a27-421...| 804| 7|[Lay it down, Am ...| 18231|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+--------------------+----------+
Is there any reason why row 801 and 802 are skipped while performing
incremental query? If instead of delete I try another upsert there then it is
listing those rows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]