wombatu-kun opened a new issue, #13219:
URL: https://github.com/apache/hudi/issues/13219
**Describe the problem you faced**
On COW table everything fine, but on MOR: when record is deleted from target
table, further execution of `MERGE INTO` still see it as existing and execute
action from "when matched" clause.
**To Reproduce**
1. create MOR target table
2. insert some records
3. delete one record
4. merge into target table some records (one that was previously deleted,
one that exists in target, one that does not exist in target) with `update when
matched` and `insert when not matched`.
**Expected behavior**
Record that was previously deleted should be inserted as is, record that
existed in target should be updated, record that did not exist in target should
be added, records that existed only in target should stay without changes.
**Environment Description**
* Hudi version : current master
* Spark version : 3.5
* Hive version :
* Hadoop version :
* Storage (HDFS/S3/GCS..) :
* Running on Docker? (yes/no) :
**Additional context**
You can add this case to `TestMergeIntoTable` and run locally:
```
test("Test Merge Into with record that was previously deleted") {
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
// create and fill out source table
val tbParquet = generateTableName
spark.sql(
s"""
|create table $tbParquet (
| id int, comb int, col0 int, col1 string
|) using parquet
|location '${tmp.getCanonicalPath}/$tbParquet'
|""".stripMargin)
spark.sql(
s"""
|insert into $tbParquet values
|(3,30,130,'aa3'),
|(5,50,150,'aa5'),
|(6,60,160,'aa6')
|""".stripMargin)
// create target table
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int, comb int, col0 int, col1 string
| ) using hudi
| options(
| type='$tableType',
| primaryKey='id',
| preCombineField='comb',
| 'hoodie.index.type'='BUCKET',
| 'hoodie.bucket.index.num.buckets'='5'
| )
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
// fill out target table from source
spark.sql(s"insert into $tableName select * from $tbParquet")
checkAnswer(s"select id, col0, col1 from $tableName order by id")(
Seq(3, 130, "aa3"), Seq(5, 150, "aa5"), Seq(6, 160, "aa6")
)
// delete one record from target
spark.sql(s"delete from $tableName where id = 3")
// make sure there is no record with id=3 in target table anymore
checkAnswer(s"select id, col0, col1 from $tableName order by id")(
Seq(5, 150, "aa5"), Seq(6, 160, "aa6")
)
// insert one more record to source
spark.sql(s"insert into $tbParquet values (7,70,170,'aa7')")
// merge into target from source
spark.sql(
s"""
|merge into $tableName t1 using $tbParquet t2 on t1.id = t2.id
| when matched then update set id = t2.id, comb=t2.comb,
col0=t2.col0+1, col1='oo'
| when not matched then insert *
|""".stripMargin)
spark.sql(s"select * from $tableName order by id").show()
// id=3 and id=7 should be inserted as is, id=5 and id=6 should be
updated
checkAnswer(s"select id, col0, col1 from $tableName order by id")(
Seq(3, 130, "aa3"), Seq(5, 151, "oo"), Seq(6, 161, "oo"), Seq(7,
170, "aa7")
)
}
}
}
```
**Stacktrace**
`Expected Array([3,130,aa3], [5,151,oo], [6,161,oo], [7,170,aa7]), but got
Array([3,131,oo], [5,151,oo], [6,161,oo], [7,170,aa7])`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]