[
https://issues.apache.org/jira/browse/HUDI-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Y Ethan Guo updated HUDI-6886:
------------------------------
Sprint: Hudi 1.0 Sprint 2024/09/23-29
> Time travel queries should not use RLI
> --------------------------------------
>
> Key: HUDI-6886
> URL: https://issues.apache.org/jira/browse/HUDI-6886
> Project: Apache Hudi
> Issue Type: Bug
> Components: index
> Reporter: Amrish Lal
> Assignee: Sagar Sumit
> Priority: Blocker
> Fix For: 1.0.0
>
>
> If RLI is used to evaluate a time travel query, incorrect results will be
> returned. The issue can be reproduced through the following steps:
> # Create a table and add three records into the table at three different
> time instants by running the script below three times with some time gap
> between each run.
> {code:java}
> //
> // Scala script for creating a table with RLI
> //
> import org.apache.hudi.QuickstartUtils._
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieWriteConfig._
> import org.apache.hudi.common.model.HoodieRecord
> val tableName = "hudi_trips_cow"
> val basePath = "file:///Users/amrish/tables/travel"
> val dataGen = new DataGenerator
> // Generate inserts
> val inserts = convertToStringList(dataGen.generateInserts(3))
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
> df.write.format("hudi").
> | options(getQuickstartWriteConfigs).
> | option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> | option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> | option(TABLE_NAME, tableName).
> | option("hoodie.metadata.enable", "true").
> | option("hoodie.metadata.record.index.enable", "true").
> | option("hoodie.enable.data.skipping", "true").
> | option("hoodie.index.type", "RECORD_INDEX").
> | option("hoodie.metadata.secondary.record.index.enable", "true").
> | option("hoodie.datasource.write.table.type", "COPY_ON_WRITE").
> | option("hoodie.parquet.small.file.limit", "0").
> | option("hoodie.compact.inline.max.delta.commits", "3").
> | mode(Append).
> | save(basePath) {code}
> # Run select query, ordered by _hoodie_commit_time asc, to see the data in
> the table
> {code:java}
> //
> // Select query
> //
> val readOpts = Map(
> "hoodie.metadata.enable" -> "true",
> "hoodie.metadata.record.index.enable" -> "true",
> "hoodie.enable.data.skipping" -> "true",
> "hoodie.index.type" -> "RECORD_INDEX"
> )
> val tripsSnapshotDF = spark.
> read.
> format("hudi").
> options(readOpts).
> load(basePath)
> tripsSnapshotDF.createOrReplaceTempView("myrli")
> spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
> {code}
> # The results of the select query run above should look something like the
> result shown below
> {code:java}
> +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
> |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key
> |_hoodie_partition_path |_hoodie_file_name
> |begin_lat |begin_lon
> |driver |end_lat |end_lon |fare |rider
> |ts |uuid |partitionpath
> |
> +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
> |20230920163539097
> |20230920163539097_0_0|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo
>
> |4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet
> |0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858
> |0.9671159942018241
> |34.158284716382845|rider-213|1695016681714|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo
> |
> |20230920163539097
> |20230920163539097_0_1|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo
>
> |4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet
> |0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602
> |0.5030798142293655 |43.4923811219014
> |rider-213|1694997420535|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo
> |
> |20230920163539097
> |20230920163539097_1_0|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|1347f595-c7f8-446b-bd49-344b514ea503-0_1-33-349_20230920163539097.parquet
> |0.5731835407930634 |0.4923479652912024 |driver-213|0.08988581780930216
> |0.42520899698713666|64.27696295884016
> |rider-213|1695175828360|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|
> |20230920164025402
> |20230920164025402_0_1|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo
>
> |6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet
> |0.1762368947074756 |0.7942627821413218 |driver-226|0.22400157419609057
> |0.08079517477095832|87.42041526408588
> |rider-226|1695061974993|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo
> |
> |20230920164025402
> |20230920164025402_0_0|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo
>
> |6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet
> |0.36519521355305173|0.9888075495133515
> |driver-226|0.013401540991535565|0.3794482769934313 |18.56488085068272
> |rider-226|1695131524692|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo
> |
> |20230920164025402
> |20230920164025402_1_0|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|cb40f7c0-6c4a-425a-80b2-159f92d30ec9-0_1-80-420_20230920164025402.parquet
> |0.6220454661413275 |0.72024792576853 |driver-226|0.9048755755365163
> |0.727695054518325 |40.613510977307
> |rider-226|1694672409324|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|
> |20230920164221116
> |20230920164221116_1_0|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai
>
> |24ef5d9d-f132-435d-aacc-ecb2099504fe-0_1-118-493_20230920164221116.parquet|0.06224031095826987|0.4106290929046368
> |driver-913|0.964603455586492 |0.13957566957654388|45.40019146422721
> |rider-913|1694676094520|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai
> |
> |20230920164221116
> |20230920164221116_0_1|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo
>
> |c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.25252652214479043|0.33922164839486424|driver-913|0.909372837469859
> |0.9017656600243008 |82.36411667430927
> |rider-913|1694663124830|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo
> |
> |20230920164221116
> |20230920164221116_0_0|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo
>
> |c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.6346040067610669
> |0.6662084366450246 |driver-913|0.9065078444936647 |0.7124299678100179
> |5.336723040266267
> |rider-913|1695098874341|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo
> |
> +-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
> {code}
> # Note that each time instance (_hoodie_commit_time) has three records. Now
> delete one record from the middle time instance (20230920164025402). We use
> uuid to identify the record to delete as shown below
> {code:java}
> //
> // Delete one records (Hard Delete)
> //
> val dataset = spark.sql("select * from myrli where
> uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").limit(1)
> val deletes = dataGen.generateDeletes(dataset.collectAsList())
> val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
> deleteDf.write.format("hudi").
> option(OPERATION_OPT_KEY, "delete").
> option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> option(TABLE_NAME, tableName).
> mode(Append).
> save(basePath) {code}
> # Run the select query from step (2) above again to confirm that the record
> was deleted.
> # Now run the same select query, but with time travel option as shown below
> {code:java}
> //
> // Time travel query
> //
> val readOpts = Map(
> "hoodie.metadata.enable" -> "true",
> "hoodie.metadata.record.index.enable" -> "true",
> "hoodie.enable.data.skipping" -> "true",
> "hoodie.index.type" -> "RECORD_INDEX",
> "as.of.instant" -> "20230920164025402"
> )
> val tripsSnapshotDF = spark.
> read.
> format("hudi").
> options(readOpts).
> load(basePath)
> tripsSnapshotDF.createOrReplaceTempView("myrli")
> spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
> spark.sql("select * from myrli WHERE
> uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").show(false) {code}
> # Note that the first query will show result containing six records
> including the record with uuid 097e5763-e19f-4820-9c8e-808aba60e3ff which we
> had deleted in step 4. This is perfectly ok since the result is generated,
> not at the latest time interval, but at time interval 20230920164025402.
> However, the second select query does not return any results, even though the
> second select query is also running over the dataset generated at time
> interval 20230920164025402. The second query returns incorrect results
> because it is using latest RLI to evaluate a query at a time instant in past.
>
> The bug here is that we cannot use RLI to evaluate time stamp query at a past
> interval becuase RLI only maintains the latest state. As fix, we should avoid
> using RLI if the "as.of.instant" flag is set.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)