Brandon Scheller created HUDI-305:
-------------------------------------

             Summary: Presto MOR "_rt" queries only reads base parquet file 
                 Key: HUDI-305
                 URL: https://issues.apache.org/jira/browse/HUDI-305
             Project: Apache Hudi (incubating)
          Issue Type: Bug
            Reporter: Brandon Scheller


Code example to reproduce.
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode

val df = Seq(
  ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
  ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
  ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
  ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
  ).toDF("event_id", "event_name", "event_ts", "event_type")

var tableName = "hudi_events_mor_1"
var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName

// write hudi dataset
df.write.format("org.apache.hudi")
  .option(HoodieWriteConfig.TABLE_NAME, tableName)
  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
  .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
  .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
  .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
  .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
  .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
  .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
  .mode(SaveMode.Overwrite)
  .save(tablePath)

// update a record with event_name "event_name_123" => "event_name_changed"
val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
val df2 = df1.filter($"event_id" === "104")
val df3 = df2.withColumn("event_name", lit("event_name_changed"))

// update hudi dataset
df3.write.format("org.apache.hudi")
   .option(HoodieWriteConfig.TABLE_NAME, tableName)
   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
   .option("hoodie.compact.inline", "false")
   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
   .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
   .mode(SaveMode.Append)
   .save(tablePath)
{code}
Now when querying the real-time table from Hive, we have no issue seeing the 
updated value:
{code:java}
hive> select event_name from hudi_events_mor_1_rt;
OK
event_name_900
event_name_changed
event_name_546
event_name_678
Time taken: 0.103 seconds, Fetched: 4 row(s)
{code}
But when querying the real-time table from Presto, we only read the base 
parquet file and do not see the update that should be merged in from the log 
file.
{code:java}
presto:default> select event_name from hudi_events_mor_1_rt;
   event_name
----------------
 event_name_900
 event_name_123
 event_name_546
 event_name_678
(4 rows)
{code}
Our current understanding of this issue is that while the 
HoodieParquetRealtimeInputFormat correctly generates the splits. The 
RealtimeCompactedRecordReader record reader is not used so it is not reading 
the log file and only reading the base parquet file.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to