Amrish Lal created HUDI-6886:
--------------------------------
Summary: Time travel queries should not use RLI
Key: HUDI-6886
URL: https://issues.apache.org/jira/browse/HUDI-6886
Project: Apache Hudi
Issue Type: Bug
Reporter: Amrish Lal
If RLI is used to evaluate a time travel query, incorrect results will be
returned. The issue can be reproduced through the following steps:
# Create a table and add three records into the table at three different time
instants by running the script below three times with some time gap between
each run.
{code:java}
//
// Scala script for creating a table with RLI
//
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///Users/amrish/tables/travel"
val dataGen = new DataGenerator
// Generate inserts
val inserts = convertToStringList(dataGen.generateInserts(3))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
df.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| option("hoodie.metadata.enable", "true").
| option("hoodie.metadata.record.index.enable", "true").
| option("hoodie.enable.data.skipping", "true").
| option("hoodie.index.type", "RECORD_INDEX").
| option("hoodie.metadata.secondary.record.index.enable", "true").
| option("hoodie.datasource.write.table.type", "COPY_ON_WRITE").
| option("hoodie.parquet.small.file.limit", "0").
| option("hoodie.compact.inline.max.delta.commits", "3").
| mode(Append).
| save(basePath) {code}
# Run select query, ordered by _hoodie_commit_time asc, to see the data in the
table
{code:java}
//
// Select query
//
val readOpts = Map(
"hoodie.metadata.enable" -> "true",
"hoodie.metadata.record.index.enable" -> "true",
"hoodie.enable.data.skipping" -> "true",
"hoodie.index.type" -> "RECORD_INDEX"
)
val tripsSnapshotDF = spark.
read.
format("hudi").
options(readOpts).
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("myrli")
spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
{code}
# The results of the select query run above should look something like the
result shown below
{code:java}
+-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key
|_hoodie_partition_path |_hoodie_file_name
|begin_lat |begin_lon |driver
|end_lat |end_lon |fare |rider |ts
|uuid |partitionpath
|
+-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
|20230920163539097
|20230920163539097_0_0|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo
|4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet
|0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858
|0.9671159942018241
|34.158284716382845|rider-213|1695016681714|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo
|
|20230920163539097
|20230920163539097_0_1|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo
|4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet
|0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602
|0.5030798142293655 |43.4923811219014
|rider-213|1694997420535|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo
|
|20230920163539097
|20230920163539097_1_0|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|1347f595-c7f8-446b-bd49-344b514ea503-0_1-33-349_20230920163539097.parquet
|0.5731835407930634 |0.4923479652912024 |driver-213|0.08988581780930216
|0.42520899698713666|64.27696295884016
|rider-213|1695175828360|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|
|20230920164025402
|20230920164025402_0_1|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo
|6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet
|0.1762368947074756 |0.7942627821413218 |driver-226|0.22400157419609057
|0.08079517477095832|87.42041526408588
|rider-226|1695061974993|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo
|
|20230920164025402
|20230920164025402_0_0|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo
|6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet
|0.36519521355305173|0.9888075495133515
|driver-226|0.013401540991535565|0.3794482769934313 |18.56488085068272
|rider-226|1695131524692|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo
|
|20230920164025402
|20230920164025402_1_0|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|cb40f7c0-6c4a-425a-80b2-159f92d30ec9-0_1-80-420_20230920164025402.parquet
|0.6220454661413275 |0.72024792576853 |driver-226|0.9048755755365163
|0.727695054518325 |40.613510977307
|rider-226|1694672409324|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|
|20230920164221116
|20230920164221116_1_0|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai
|24ef5d9d-f132-435d-aacc-ecb2099504fe-0_1-118-493_20230920164221116.parquet|0.06224031095826987|0.4106290929046368
|driver-913|0.964603455586492 |0.13957566957654388|45.40019146422721
|rider-913|1694676094520|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai
|
|20230920164221116
|20230920164221116_0_1|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo
|c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.25252652214479043|0.33922164839486424|driver-913|0.909372837469859
|0.9017656600243008 |82.36411667430927
|rider-913|1694663124830|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo
|
|20230920164221116
|20230920164221116_0_0|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo
|c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.6346040067610669
|0.6662084366450246 |driver-913|0.9065078444936647 |0.7124299678100179
|5.336723040266267
|rider-913|1695098874341|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo
|
+-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
{code}
# Note that each time instance (_hoodie_commit_time) has three records. Now
delete one record from the middle time instance (20230920164025402). We use
uuid to identify the record to delete as shown below
{code:java}
//
// Delete one records (Hard Delete)
//
val dataset = spark.sql("select * from myrli where
uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").limit(1)
val deletes = dataGen.generateDeletes(dataset.collectAsList())
val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
deleteDf.write.format("hudi").
option(OPERATION_OPT_KEY, "delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath) {code}
# Run the select query from step (2) above again to confirm that the record
was deleted.
# Now run the same select query, but with time travel option as shown below
{code:java}
//
// Time travel query
//
val readOpts = Map(
"hoodie.metadata.enable" -> "true",
"hoodie.metadata.record.index.enable" -> "true",
"hoodie.enable.data.skipping" -> "true",
"hoodie.index.type" -> "RECORD_INDEX",
"as.of.instant" -> "20230920164025402"
)
val tripsSnapshotDF = spark.
read.
format("hudi").
options(readOpts).
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("myrli")
spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
spark.sql("select * from myrli WHERE
uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").show(false) {code}
# Note that the first query will show result containing six records including
the record with uuid 097e5763-e19f-4820-9c8e-808aba60e3ff which we had deleted
in step 4. This is perfectly ok since the result is generated, not at the
latest time interval, but at time interval 20230920164025402. However, the
second select query does not return any results, even though the second select
query is also running over the dataset generated at time interval
20230920164025402. The second query returns incorrect results because it is
using latest RLI to evaluate a query at a time instant in past.
The bug here is that we cannot use RLI to evaluate time stamp query at a past
interval becuase RLI only maintains the latest state. As fix, we should avoid
using RLI if the "as.of.instant" flag is set.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)