[ 
https://issues.apache.org/jira/browse/HUDI-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo reassigned HUDI-6891:
-------------------------------

    Assignee: Sagar Sumit

> Read Optimized Queries should not use RLI
> -----------------------------------------
>
>                 Key: HUDI-6891
>                 URL: https://issues.apache.org/jira/browse/HUDI-6891
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: index
>            Reporter: Amrish Lal
>            Assignee: Sagar Sumit
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> Read optimized query on a MOR table with RLI present don't produce correct 
> results as RLI lookup doesn't have the ability to distinguish between records 
> that are in base table vs records that are in log files.
> To reproduce the issue:
>  # Create a MOR table and add three records to the table.
>  # Delete one of the records (identified by uuid=5 for example) from the 
> table (this delete will go into a log file).
>  # Run read optimized query "select * from mytable", you will see all three 
> records as base table does not have the knowledge that record with id 5 was 
> deleted.
>  # Run read optimized query "select * from mytable where id = 5". This query 
> should return one record since the record is present in the base file. 
> However, if RLI is enabled this query will get evaluated against RLI and will 
> not return any records. This appears to be inconsistent with the results 
> returned in step 3.
> spark-shell script to reproduce the issue attached below:
> {code:java}
>   //
>   // Scala script for creating a table with RLI
>   //
>   import org.apache.hudi.QuickstartUtils._
>   import scala.collection.JavaConversions._
>   import org.apache.spark.sql.SaveMode._
>   import org.apache.hudi.DataSourceReadOptions._
>   import org.apache.hudi.DataSourceWriteOptions._
>   import org.apache.hudi.config.HoodieWriteConfig._
>   import org.apache.hudi.common.model.HoodieRecord
>   val tableName = "hudi_trips_cow"
>   val basePath = "file:///Users/amrish/tables/morereadoptimized"
>   val dataGen = new DataGenerator
>   // Generate inserts
>   val inserts = convertToStringList(dataGen.generateInserts(3))
>   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
>   df.write.format("hudi").
>        |   options(getQuickstartWriteConfigs).
>        |   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>        |   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>        |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>        |   option(TABLE_NAME, tableName).
>        |   option("hoodie.metadata.enable", "true").
>        |   option("hoodie.metadata.record.index.enable", "true").
>        |   option("hoodie.enable.data.skipping", "true").
>        |   option("hoodie.index.type", "RECORD_INDEX").
>        |   option("hoodie.metadata.secondary.record.index.enable", "true").
>        |   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
>        |   option("hoodie.parquet.small.file.limit", "0").
>        |   option("hoodie.compact.inline.max.delta.commits", "3").
>        |   mode(Append).
>        |   save(basePath)
> //
> // Select query
> //
> val readOpts = Map(
>     "hoodie.metadata.enable" -> "true",
>     "hoodie.metadata.record.index.enable" -> "true",
>     "hoodie.enable.data.skipping" -> "true",
>     "hoodie.index.type" -> "RECORD_INDEX"
> )
> val tripsSnapshotDF = spark.
>   read.
>   format("hudi").
>   options(readOpts).
>   load(basePath)
> tripsSnapshotDF.createOrReplaceTempView("myrli")
> spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
> //spark.sql("select count(*) from myrli").show(false)
> //
> // Delete one records (Hard Delete)
> //
> val dataset = spark.sql("select * from myrli where 
> uuid='7cb4080c-05ff-475c-94d8-ebe369ff4c2d'").limit(1)
> val deletes = dataGen.generateDeletes(dataset.collectAsList())
> val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
> deleteDf.write.format("hudi").
>   option(OPERATION_OPT_KEY, "delete").
>   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>   option(TABLE_NAME, tableName).
>   mode(Append).
>   save(basePath)
> spark.sql("select count(*) from myrli").show(false)
> spark.sql("select driver, rider, uuid from myrli where 
> uuid='49d34c1a-1558-4c0d-996c-1c05851f384a'").show(truncate=false)
> //
> // Read optimzied query
> //
> val readOpts = Map(
>     "hoodie.metadata.enable" -> "true",
>     "hoodie.metadata.record.index.enable" -> "false",
>     "hoodie.enable.data.skipping" -> "true",
>     "hoodie.index.type" -> "RECORD_INDEX",
>     "hoodie.datasource.query.type" -> "read_optimized",
> )
> val tripsSnapshotDF = spark.
>   read.
>   format("hudi").
>   options(readOpts).
>   load(basePath)
> tripsSnapshotDF.createOrReplaceTempView("myrli")
> spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
> spark.sql("select * from myrli WHERE 
> uuid='7cb4080c-05ff-475c-94d8-ebe369ff4c2d'").show(false)
> //spark.sql("select count(*) from myrli").show(false) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to