[
https://issues.apache.org/jira/browse/HUDI-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983043#comment-16983043
]
Bhavani Sudha commented on HUDI-305:
------------------------------------
@[~bdscheller] I was able to confirm this is indeed the cause. The FileSplit
returned from HoodieParquetRealtimeInputFormat is implemented in class
HoodieRealtimeFileSplit. This only passes the base parquet file to the super
class FileSplit and the merging actually happens in the write(DataOutput) and
readFileds(DataInput) methods. These are indirectly used by RecordReaders in
Hive world. However, Presto assumes every split is tangibly associated with a
single FilePath. And also it uses only the *getPath(), getStart(), getLength()*
APIs of the FileSplit class. That reasons why we only see the results from the
base Parquet file.
This looks like a deep implementation change in the presto-hive connector the
way it leverages FileSplits. Am trying to see if there is any way we can make a
quick fix.
> Presto MOR "_rt" queries only reads base parquet file
> ------------------------------------------------------
>
> Key: HUDI-305
> URL: https://issues.apache.org/jira/browse/HUDI-305
> Project: Apache Hudi (incubating)
> Issue Type: Bug
> Components: Presto Integration
> Environment: On AWS EMR
> Reporter: Brandon Scheller
> Assignee: Bhavani Sudha Saktheeswaran
> Priority: Major
> Fix For: 0.5.1
>
>
> Code example to reproduce.
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveMode
> val df = Seq(
> ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
> ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
> ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
> ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
> ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_events_mor_1"
> var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
> DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
> .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
> .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
> .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
> .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
> .mode(SaveMode.Overwrite)
> .save(tablePath)
> // update a record with event_name "event_name_123" => "event_name_changed"
> val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
> val df2 = df1.filter($"event_id" === "104")
> val df3 = df2.withColumn("event_name", lit("event_name_changed"))
> // update hudi dataset
> df3.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
> DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
> .option("hoodie.compact.inline", "false")
> .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
> .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
> .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
> .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
> "org.apache.hudi.hive.MultiPartKeysValueExtractor")
> .mode(SaveMode.Append)
> .save(tablePath)
> {code}
> Now when querying the real-time table from Hive, we have no issue seeing the
> updated value:
> {code:java}
> hive> select event_name from hudi_events_mor_1_rt;
> OK
> event_name_900
> event_name_changed
> event_name_546
> event_name_678
> Time taken: 0.103 seconds, Fetched: 4 row(s)
> {code}
> But when querying the real-time table from Presto, we only read the base
> parquet file and do not see the update that should be merged in from the log
> file.
> {code:java}
> presto:default> select event_name from hudi_events_mor_1_rt;
> event_name
> ----------------
> event_name_900
> event_name_123
> event_name_546
> event_name_678
> (4 rows)
> {code}
> Our current understanding of this issue is that while the
> HoodieParquetRealtimeInputFormat correctly generates the splits. The
> RealtimeCompactedRecordReader record reader is not used so it is not reading
> the log file and only reading the base parquet file.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)