[
https://issues.apache.org/jira/browse/HUDI-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ethan Guo reassigned HUDI-6891:
-------------------------------
Assignee: Sagar Sumit
> Read Optimized Queries should not use RLI
> -----------------------------------------
>
> Key: HUDI-6891
> URL: https://issues.apache.org/jira/browse/HUDI-6891
> Project: Apache Hudi
> Issue Type: Bug
> Components: index
> Reporter: Amrish Lal
> Assignee: Sagar Sumit
> Priority: Blocker
> Fix For: 1.0.0
>
>
> Read optimized query on a MOR table with RLI present don't produce correct
> results as RLI lookup doesn't have the ability to distinguish between records
> that are in base table vs records that are in log files.
> To reproduce the issue:
> # Create a MOR table and add three records to the table.
> # Delete one of the records (identified by uuid=5 for example) from the
> table (this delete will go into a log file).
> # Run read optimized query "select * from mytable", you will see all three
> records as base table does not have the knowledge that record with id 5 was
> deleted.
> # Run read optimized query "select * from mytable where id = 5". This query
> should return one record since the record is present in the base file.
> However, if RLI is enabled this query will get evaluated against RLI and will
> not return any records. This appears to be inconsistent with the results
> returned in step 3.
> spark-shell script to reproduce the issue attached below:
> {code:java}
> //
> // Scala script for creating a table with RLI
> //
> import org.apache.hudi.QuickstartUtils._
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieWriteConfig._
> import org.apache.hudi.common.model.HoodieRecord
> val tableName = "hudi_trips_cow"
> val basePath = "file:///Users/amrish/tables/morereadoptimized"
> val dataGen = new DataGenerator
> // Generate inserts
> val inserts = convertToStringList(dataGen.generateInserts(3))
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
> df.write.format("hudi").
> | options(getQuickstartWriteConfigs).
> | option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> | option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> | option(TABLE_NAME, tableName).
> | option("hoodie.metadata.enable", "true").
> | option("hoodie.metadata.record.index.enable", "true").
> | option("hoodie.enable.data.skipping", "true").
> | option("hoodie.index.type", "RECORD_INDEX").
> | option("hoodie.metadata.secondary.record.index.enable", "true").
> | option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
> | option("hoodie.parquet.small.file.limit", "0").
> | option("hoodie.compact.inline.max.delta.commits", "3").
> | mode(Append).
> | save(basePath)
> //
> // Select query
> //
> val readOpts = Map(
> "hoodie.metadata.enable" -> "true",
> "hoodie.metadata.record.index.enable" -> "true",
> "hoodie.enable.data.skipping" -> "true",
> "hoodie.index.type" -> "RECORD_INDEX"
> )
> val tripsSnapshotDF = spark.
> read.
> format("hudi").
> options(readOpts).
> load(basePath)
> tripsSnapshotDF.createOrReplaceTempView("myrli")
> spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
> //spark.sql("select count(*) from myrli").show(false)
> //
> // Delete one records (Hard Delete)
> //
> val dataset = spark.sql("select * from myrli where
> uuid='7cb4080c-05ff-475c-94d8-ebe369ff4c2d'").limit(1)
> val deletes = dataGen.generateDeletes(dataset.collectAsList())
> val deleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
> deleteDf.write.format("hudi").
> option(OPERATION_OPT_KEY, "delete").
> option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> option(TABLE_NAME, tableName).
> mode(Append).
> save(basePath)
> spark.sql("select count(*) from myrli").show(false)
> spark.sql("select driver, rider, uuid from myrli where
> uuid='49d34c1a-1558-4c0d-996c-1c05851f384a'").show(truncate=false)
> //
> // Read optimzied query
> //
> val readOpts = Map(
> "hoodie.metadata.enable" -> "true",
> "hoodie.metadata.record.index.enable" -> "false",
> "hoodie.enable.data.skipping" -> "true",
> "hoodie.index.type" -> "RECORD_INDEX",
> "hoodie.datasource.query.type" -> "read_optimized",
> )
> val tripsSnapshotDF = spark.
> read.
> format("hudi").
> options(readOpts).
> load(basePath)
> tripsSnapshotDF.createOrReplaceTempView("myrli")
> spark.sql("select * from myrli order by _hoodie_commit_time asc").show(false)
> spark.sql("select * from myrli WHERE
> uuid='7cb4080c-05ff-475c-94d8-ebe369ff4c2d'").show(false)
> //spark.sql("select count(*) from myrli").show(false) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)