Amrish Lal created HUDI-6886:
--------------------------------

             Summary: Time travel queries should not use RLI
                 Key: HUDI-6886
                 URL: https://issues.apache.org/jira/browse/HUDI-6886
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Amrish Lal


If RLI is used to evaluate a time travel query, incorrect results will be 
returned. The issue can be reproduced through the following steps:
 # Create a table and add three records into the table at three different time 
instants by running the script below three times with some time gap between 
each run.

{code:java}
  //
  // Scala script for creating a table with RLI
  //
  import org.apache.hudi.QuickstartUtils._
  import scala.collection.JavaConversions._
  import org.apache.spark.sql.SaveMode._
  import org.apache.hudi.DataSourceReadOptions._
  import org.apache.hudi.DataSourceWriteOptions._
  import org.apache.hudi.config.HoodieWriteConfig._
  import org.apache.hudi.common.model.HoodieRecord


  val tableName = "hudi_trips_cow"
  val basePath = "file:///Users/amrish/tables/travel"
  val dataGen = new DataGenerator


  // Generate inserts
  val inserts = convertToStringList(dataGen.generateInserts(3))
  val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))


  df.write.format("hudi").
       |   options(getQuickstartWriteConfigs).
       |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
       |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
       |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
       |   option(TABLE_NAME, tableName).
       |   option("hoodie.metadata.enable", "true").
       |   option("hoodie.metadata.record.index.enable", "true").
       |   option("hoodie.enable.data.skipping", "true").
       |   option("hoodie.index.type", "RECORD_INDEX").
       |   option("hoodie.metadata.secondary.record.index.enable", "true").
       |   option("hoodie.datasource.write.table.type", "COPY_ON_WRITE").
       |   option("hoodie.parquet.small.file.limit", "0").
       |   option("hoodie.compact.inline.max.delta.commits", "3").
       |   mode(Append).
       |   save(basePath) {code}

 # Run select query, ordered by _hoodie_commit_time asc, to see the data in the 
table

{code:java}
//
// Select query
//
val readOpts = Map(
    "hoodie.metadata.enable" -> "true",
    "hoodie.metadata.record.index.enable" -> "true",
    "hoodie.enable.data.skipping" -> "true",
    "hoodie.index.type" -> "RECORD_INDEX"
)


val tripsSnapshotDF = spark.
  read.
  format("hudi").
  options(readOpts).
  load(basePath)


tripsSnapshotDF.createOrReplaceTempView("myrli")
spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false) 
{code}

 # The results of the select query run above should look something like the 
result shown below

{code:java}
+-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key                  
|_hoodie_partition_path              |_hoodie_file_name                         
                                |begin_lat          |begin_lon          |driver 
   |end_lat             |end_lon            |fare              |rider    |ts    
       |uuid                                |partitionpath                      
 |
+-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
|20230920163539097  
|20230920163539097_0_0|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo
           
|4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet 
|0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858   
|0.9671159942018241 
|34.158284716382845|rider-213|1695016681714|6658c690-f28d-4874-86b5-709c7360e935|americas/brazil/sao_paulo
           |
|20230920163539097  
|20230920163539097_0_1|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo
           
|4ab59ebf-9274-44d6-bc29-9fc3112038d3-0_0-33-348_20230920163539097.parquet 
|0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602  
|0.5030798142293655 |43.4923811219014  
|rider-213|1694997420535|60eefced-6ced-41cb-9e7c-31787a8fdf2c|americas/brazil/sao_paulo
           |
|20230920163539097  
|20230920163539097_1_0|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|1347f595-c7f8-446b-bd49-344b514ea503-0_1-33-349_20230920163539097.parquet
 |0.5731835407930634 |0.4923479652912024 |driver-213|0.08988581780930216 
|0.42520899698713666|64.27696295884016 
|rider-213|1695175828360|17b1ca71-f1a0-4341-97db-bfb269a8c747|americas/united_states/san_francisco|
|20230920164025402  
|20230920164025402_0_1|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo
           
|6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet 
|0.1762368947074756 |0.7942627821413218 |driver-226|0.22400157419609057 
|0.08079517477095832|87.42041526408588 
|rider-226|1695061974993|8b4e6aca-c49a-43de-b150-419b9e75bc62|americas/brazil/sao_paulo
           |
|20230920164025402  
|20230920164025402_0_0|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo
           
|6b7bd997-983e-41e9-ae02-8896f13fd083-0_0-80-419_20230920164025402.parquet 
|0.36519521355305173|0.9888075495133515 
|driver-226|0.013401540991535565|0.3794482769934313 |18.56488085068272 
|rider-226|1695131524692|097e5763-e19f-4820-9c8e-808aba60e3ff|americas/brazil/sao_paulo
           |
|20230920164025402  
|20230920164025402_1_0|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|cb40f7c0-6c4a-425a-80b2-159f92d30ec9-0_1-80-420_20230920164025402.parquet
 |0.6220454661413275 |0.72024792576853   |driver-226|0.9048755755365163  
|0.727695054518325  |40.613510977307   
|rider-226|1694672409324|f4accc3e-061a-4c6c-a957-dc5e4ed38cf7|americas/united_states/san_francisco|
|20230920164221116  
|20230920164221116_1_0|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai  
                
|24ef5d9d-f132-435d-aacc-ecb2099504fe-0_1-118-493_20230920164221116.parquet|0.06224031095826987|0.4106290929046368
 |driver-913|0.964603455586492   |0.13957566957654388|45.40019146422721 
|rider-913|1694676094520|7db9e596-48cc-47b9-b21f-b5cb5bbeb381|asia/india/chennai
                  |
|20230920164221116  
|20230920164221116_0_1|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo
           
|c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.25252652214479043|0.33922164839486424|driver-913|0.909372837469859
   |0.9017656600243008 |82.36411667430927 
|rider-913|1694663124830|db25a621-20f0-4e96-a81a-07101d00d63e|americas/brazil/sao_paulo
           |
|20230920164221116  
|20230920164221116_0_0|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo
           
|c8e98b87-4b87-4200-a018-d6b596f13538-0_0-118-492_20230920164221116.parquet|0.6346040067610669
 |0.6662084366450246 |driver-913|0.9065078444936647  |0.7124299678100179 
|5.336723040266267 
|rider-913|1695098874341|721a395e-eb8e-401c-bbaf-b8ab2ef7cf46|americas/brazil/sao_paulo
           |
+-------------------+---------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------+-------------------+-------------------+----------+--------------------+-------------------+------------------+---------+-------------+------------------------------------+------------------------------------+
 {code}

 # Note that each time instance (_hoodie_commit_time) has three records. Now 
delete one record from the middle time instance (20230920164025402). We use 
uuid to identify the record to delete as shown below

{code:java}
  //
  // Delete one records (Hard Delete)
  //
  val dataset = spark.sql("select * from myrli where 
uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").limit(1)
  val deletes = dataGen.generateDeletes(dataset.collectAsList())
  val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))


  deleteDf.write.format("hudi").
    option(OPERATION_OPT_KEY, "delete").
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, tableName).
    mode(Append).
    save(basePath) {code}

 # Run the select query from step (2) above again to confirm that the record 
was deleted.
 # Now run the same select query, but with time travel option as shown below

{code:java}
//
// Time travel query
//
val readOpts = Map(
    "hoodie.metadata.enable" -> "true",
    "hoodie.metadata.record.index.enable" -> "true",
    "hoodie.enable.data.skipping" -> "true",
    "hoodie.index.type" -> "RECORD_INDEX",
    "as.of.instant" -> "20230920164025402"
)


val tripsSnapshotDF = spark.
  read.
  format("hudi").
  options(readOpts).
  load(basePath)


tripsSnapshotDF.createOrReplaceTempView("myrli")
spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
spark.sql("select * from myrli WHERE 
uuid='097e5763-e19f-4820-9c8e-808aba60e3ff'").show(false) {code}

 # Note that the first query will show result containing six records including 
the record with uuid 097e5763-e19f-4820-9c8e-808aba60e3ff which we had deleted 
in step 4. This is perfectly ok since the result is generated, not at the 
latest time interval, but at time interval 20230920164025402. However, the 
second select query does not return any results, even though the second select 
query is also running over the dataset generated at time interval 
20230920164025402. The second query returns incorrect results because it is 
using latest RLI to evaluate a query at a time instant in past.

 

The bug here is that we cannot use RLI to evaluate time stamp query at a past 
interval becuase RLI only maintains the latest state. As fix, we should avoid 
using RLI if the "as.of.instant" flag is set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to