Brandon Scheller created HUDI-305:
-------------------------------------
Summary: Presto MOR "_rt" queries only reads base parquet file
Key: HUDI-305
URL: https://issues.apache.org/jira/browse/HUDI-305
Project: Apache Hudi (incubating)
Issue Type: Bug
Reporter: Brandon Scheller
Code example to reproduce.
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode
val df = Seq(
("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
).toDF("event_id", "event_name", "event_ts", "event_type")
var tableName = "hudi_events_mor_1"
var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
// write hudi dataset
df.write.format("org.apache.hudi")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.mode(SaveMode.Overwrite)
.save(tablePath)
// update a record with event_name "event_name_123" => "event_name_changed"
val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
val df2 = df1.filter($"event_id" === "104")
val df3 = df2.withColumn("event_name", lit("event_name_changed"))
// update hudi dataset
df3.write.format("org.apache.hudi")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
.option("hoodie.compact.inline", "false")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
.option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.mode(SaveMode.Append)
.save(tablePath)
{code}
Now when querying the real-time table from Hive, we have no issue seeing the
updated value:
{code:java}
hive> select event_name from hudi_events_mor_1_rt;
OK
event_name_900
event_name_changed
event_name_546
event_name_678
Time taken: 0.103 seconds, Fetched: 4 row(s)
{code}
But when querying the real-time table from Presto, we only read the base
parquet file and do not see the update that should be merged in from the log
file.
{code:java}
presto:default> select event_name from hudi_events_mor_1_rt;
event_name
----------------
event_name_900
event_name_123
event_name_546
event_name_678
(4 rows)
{code}
Our current understanding of this issue is that while the
HoodieParquetRealtimeInputFormat correctly generates the splits. The
RealtimeCompactedRecordReader record reader is not used so it is not reading
the log file and only reading the base parquet file.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)