FredMkl opened a new issue, #6591:
URL: https://github.com/apache/hudi/issues/6591
**Describe the problem you faced**
We use MOR table, we found that when updating an existing set of rows to
another partition will result in both a)generate a parquet file b)an update
written to a log file. This brings duplicate records
**To Reproduce**
```
//action1: spark-dataframe write
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable
val tableName = "f_schedule_test"
val basePath = "oss://nbadatalake-poc/fred/warehouse/dw/f_schedule_test"
val spark = SparkSession.builder.enableHiveSupport.getOrCreate
import spark.implicits._
// spark-shell
val df = Seq(
("1", "10001", "2022-08-30","2022-08-30 12:00:00.000","2022-08-30"),
("2", "10002", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
("3", "10003", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
("4", "10004", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
("5", "10005", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
("6", "10006", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30")
).toDF("game_schedule_id", "game_id", "game_date_cn", "insert_date",
"dt")
// df.show()
val hudiOptions = mutable.Map(
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.recordkey.field" -> "game_schedule_id",
"hoodie.datasource.write.precombine.field" -> "insert_date",
"hoodie.datasource.write.partitionpath.field" -> "dt",
"hoodie.index.type" -> "GLOBAL_BLOOM",
"hoodie.compact.inline" -> "true",
"hoodie.datasource.write.keygenerator.class" ->
"org.apache.hudi.keygen.ComplexKeyGenerator"
)
//step1: insert --no issue
df.write.format("hudi").
options(hudiOptions).
mode(Append).
save(basePath)
//step2: move part data to another partition --no issue
val df1 = spark.sql("select * from dw.f_schedule_test where dt =
'2022-08-30'").withColumn("dt",lit("2022-08-31")).limit(3)
df1.write.format("hudi").
options(hudiOptions).
mode(Append).
save(basePath)
//step3: move back --duplicate occurs
//Updating an existing set of rows will result in either a) a companion
log/delta file for an existing base parquet file generated from a previous
compaction or b) an update written to a log/delta file in case no compaction
ever happened for it.
val df2 = spark.sql("select * from dw.f_schedule_test where dt =
'2022-08-31'").withColumn("dt",lit("2022-08-30")).limit(3)
df2.write.format("hudi").
options(hudiOptions).
mode(Append).
save(basePath)
```
**Checking scripts:**
```
select * from dw.f_schedule_test where game_schedule_id = 1;
select _hoodie_file_name,count(*) as co from dw.f_schedule_test group by
_hoodie_file_name;
```
**results:**


**Expected behavior**
Duplicate records should not occur
**Environment Description**
* Hudi version :0.10.1
* Spark version :3.2.1
* Hive version :3.1.2
* Hadoop version :3.2.1
* Storage (HDFS/S3/GCS..) :OSS
* Running on Docker? (yes/no) :no
[hoodie.zip](https://github.com/apache/hudi/files/9487065/hoodie.zip)
**Stacktrace**
```Pls check logs as attached
[hoodie.zip](https://github.com/apache/hudi/files/9487078/hoodie.zip)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]