[
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ethan Guo updated HUDI-7610:
----------------------------
Story Points: 4
> Delete records are inconsistent depending on MOR/COW, Avro/Spark record
> merger, new filegroup reader enabled/disabled
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: HUDI-7610
> URL: https://issues.apache.org/jira/browse/HUDI-7610
> Project: Apache Hudi
> Issue Type: Bug
> Components: reader-core
> Reporter: Jonathan Vexler
> Assignee: Jonathan Vexler
> Priority: Blocker
> Fix For: 1.0.0
>
>
> Here is a test that can be run on master:
>
> {code:java}
> @Test
> def showDeleteIsInconsistent(): Unit = {
> val merger = classOf[HoodieSparkRecordMerger].getName
> //val merger = classOf[HoodieAvroRecordMerger].getName
> val useFGReader = "true"
> //val useFGReader = "false"
> //val tableType = "COPY_ON_WRITE"
> val tableType = "MERGE_ON_READ"
> val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
> val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
> (10, "2", "rider-B", "driver-B", 27.70, 1),
> (10, "3", "rider-C", "driver-C", 33.90, 10),
> (-1, "4", "rider-D", "driver-D", 34.15, 6),
> (10, "5", "rider-E", "driver-E", 17.85, 10))
> val inserts = spark.createDataFrame(data).toDF(columns: _*)
> inserts.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(TABLE_TYPE.key(), tableType).
> option("hoodie.table.name", "test_table").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Overwrite).
> save(basePath)
> val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
> (9, "2", "rider-Y", "driver-Y", 27.70, 7))
> val updates = spark.createDataFrame(updateData).toDF(columns: _*)
> updates.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "upsert").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
> val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
> val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
> deletes.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "delete").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
> val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3),
> (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
> val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns:
> _*)
> secondUpdates.write.format("hudi").
> option(RECORDKEY_FIELD.key(), "key").
> option(PRECOMBINE_FIELD.key(), "ts").
> option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
> option(TABLE_TYPE.key(), tableType).
> option(OPERATION.key(), "upsert").
> option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
> option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
> mode(SaveMode.Append).
> save(basePath)
> val df = spark.read.format("hudi").
> option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
> option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(),
> "false").load(basePath)
> val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
> finalDf.show(100,false)
> }{code}
>
> There are 4 different outcomes:
>
>
>
> merger: Avro, useFGReader: false, tableType: cow
> merger: Avro, useFGReader: true, tableType: cow
> merger: Spark, useFGReader: false, tableType: cow
> merger: Spark, useFGReader: true, tableType: cow
> {code:java}
> +---+---+--------+---------+-----+------+
> |ts |key|rider |driver |fare |number|
> +---+---+--------+---------+-----+------+
> |11 |1 |rider-X |driver-X |19.1 |9 |
> |14 |5 |rider-Z |driver-Z |17.85|3 |
> |10 |3 |rider-C |driver-C |33.9 |10 |
> |10 |2 |rider-B |driver-B |27.7 |1 |
> |-10|4 |rider-DD|driver-DD|34.15|5 |
> +---+---+--------+---------+-----+------+{code}
>
>
>
> merger: Avro, useFGReader: false, tableType: mor
> {code:java}
> +---+---+-------+--------+-----+------+
> |ts |key|rider |driver |fare |number|
> +---+---+-------+--------+-----+------+
> |11 |1 |rider-X|driver-X|19.1 |9 |
> |14 |5 |rider-Z|driver-Z|17.85|3 |
> |-1 |4 |rider-D|driver-D|34.15|6 |
> |10 |3 |rider-C|driver-C|33.9 |10 |
> |10 |2 |rider-B|driver-B|27.7 |1 |
> +---+---+-------+--------+-----+------+ {code}
>
>
>
> merger: Avro, useFGReader: true, tableType: mor
> {code:java}
> +---+---+-------+--------+-----+------+
> |ts |key|rider |driver |fare |number|
> +---+---+-------+--------+-----+------+
> |11 |1 |rider-X|driver-X|19.1 |9 |
> |14 |5 |rider-Z|driver-Z|17.85|3 |
> |10 |3 |rider-C|driver-C|33.9 |10 |
> |10 |2 |rider-B|driver-B|27.7 |1 |
> +---+---+-------+--------+-----+------+ {code}
>
>
> merger: Spark, useFGReader: false, tableType: mor
> merger: Spark, useFGReader: true, tableType: mor
> {code:java}
> java.lang.NullPointerException{code}
> --------------------------------------------------------------------------------------------
> There is actually even more strangeness with this combo:
> merger: Avro, useFGReader: false, tableType: mor
> If I change the precombine for the insert of record 4 to -6, we get:
> {code:java}
> +---+---+-------+--------+-----+------+
> |ts |key|rider |driver |fare |number|
> +---+---+-------+--------+-----+------+
> |11 |1 |rider-X|driver-X|19.1 |9 |
> |14 |5 |rider-Z|driver-Z|17.85|3 |
> |-6 |4 |rider-D|driver-D|34.15|6 |
> |10 |3 |rider-C|driver-C|33.9 |10 |
> |10 |2 |rider-B|driver-B|27.7 |1 |
> +---+---+-------+--------+-----+------+ {code}
> However, if I then get rid of the final insert, then the output is
> {code:java}
> +---+---+-------+--------+-----+------+
> |ts |key|rider |driver |fare |number|
> +---+---+-------+--------+-----+------+
> |11 |1 |rider-X|driver-X|19.1 |9 |
> |10 |5 |rider-E|driver-E|17.85|10 |
> |10 |3 |rider-C|driver-C|33.9 |10 |
> |10 |2 |rider-B|driver-B|27.7 |1 |
> +---+---+-------+--------+-----+------+ {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)