hudi-bot opened a new issue, #14991:
URL: https://github.com/apache/hudi/issues/14991
{color:#172b4d}Currently, b/c Spark by default omits partition values from
the data files (instead encoding them into partition paths for partitioned
tables), using `TimestampBasedKeyGenerator` w/ original timestamp based-column
makes it impossible to retrieve the original value (reading from Spark) even
though it's persisted in the data file as well.{color}
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.hive.MultiPartKeysValueExtractor
val df = Seq((1, "z3", 30, "v1", "2018-09-23"), (2, "z3", 35, "v1",
"2018-09-24")).toDF("id", "name", "age", "ts", "data_date")
// mor
df.write.format("hudi").
option(HoodieWriteConfig.TABLE_NAME, "issue_4417_mor").
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.datasource.write.recordkey.field", "id").
option("hoodie.datasource.write.partitionpath.field", "data_date").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.TimestampBasedKeyGenerator").
option("hoodie.deltastreamer.keygen.timebased.timestamp.type",
"DATE_STRING").
option("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyy/MM/dd").
option("hoodie.deltastreamer.keygen.timebased.timezone", "GMT+8:00").
option("hoodie.deltastreamer.keygen.timebased.input.dateformat",
"yyyy-MM-dd").
mode(org.apache.spark.sql.SaveMode.Append).
save("file:///tmp/hudi/issue_4417_mor")
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
| 20220110172709324|20220110172709324...| 2|
2018/09/24|703e56d3-badb-40b...| 2| z3| 35| v1|2018-09-24|
| 20220110172709324|20220110172709324...| 1|
2018/09/23|58fde2b3-db0e-464...| 1| z3| 30| v1|2018-09-23|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
// can not query any data
spark.read.format("hudi").load("file:///tmp/hudi/issue_4417_mor").where("data_date
= '2018-09-24'")
// still can not query any data
spark.read.format("hudi").load("file:///tmp/hudi/issue_4417_mor").where("data_date
= '2018/09/24'").show
// cow
df.write.format("hudi").
option(HoodieWriteConfig.TABLE_NAME, "issue_4417_cow").
option("hoodie.datasource.write.table.type", "COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field", "id").
option("hoodie.datasource.write.partitionpath.field", "data_date").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.TimestampBasedKeyGenerator").
option("hoodie.deltastreamer.keygen.timebased.timestamp.type",
"DATE_STRING").
option("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyy/MM/dd").
option("hoodie.deltastreamer.keygen.timebased.timezone", "GMT+8:00").
option("hoodie.deltastreamer.keygen.timebased.input.dateformat",
"yyyy-MM-dd").
mode(org.apache.spark.sql.SaveMode.Append).
save("file:///tmp/hudi/issue_4417_cow")
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
| 20220110172721896|20220110172721896...| 2|
2018/09/24|81cc7819-a0d1-4e6...| 2| z3| 35| v1|2018/09/24|
| 20220110172721896|20220110172721896...| 1|
2018/09/23|d428019b-a829-41a...| 1| z3| 30| v1|2018/09/23|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
// can not query any data
spark.read.format("hudi").load("file:///tmp/hudi/issue_4417_cow").where("data_date
= '2018-09-24'").show
// but 2018/09/24 works
spark.read.format("hudi").load("file:///tmp/hudi/issue_4417_cow").where("data_date
= '2018/09/24'").show {code}
## JIRA info
- Link: https://issues.apache.org/jira/browse/HUDI-3204
- Type: Bug
- Epic: https://issues.apache.org/jira/browse/HUDI-5425
- Affects version(s):
- 0.12.0
- Fix version(s):
- 1.1.0
---
## Comments
10/Jan/22 10:08;[email protected];[~sivabalann] [~xushiyan]
I have some doubt need to be solved.
As this case, mor and cow table have different behaviors, mor use the origin
format of data as `yyyy-MM-dd`, but cow use the output dateformat as
`yyyy/MM/dd`. Which one is right?
And users should the input format to query, or the output format to query?;;;
---
10/Jan/22 12:18;shivnarayan;my understanding it, it has to be output
dateformat (i.e. yyyy/MM/dd in this context). Looks like MOR needs to be fixed.
but really surprised how come resultant partition format differs from one table
type to another. ;;;
---
10/Jan/22 13:06;[email protected];yeah, the difference between the two
table types is wried. I'll modify the MOR type's behavior to keep it same with
COW table. But when finish this, it will not be compatible with the existing
MOR table.;;;
---
11/Jan/22 01:52;taisenki;[[email protected]] [~shivnarayan]
I think there should the input format to query ,
`TimestampBasedKeyGenerator ` just used for the partition key but not origin
data.
There also a situation such like:
when use *Read Optimized Queries* to read this data( mor data), there will
result to format '2018/09/24' :
```
scala> spark.time(spark.sql("select * from issue_4417_mor_ro where id =
'1'").select("data_date").show);
|data_date|
|----------|
|2018/09/23|
Time taken: 12067 ms
scala> spark.time(spark.sql("select * from issue_4417_mor_rt where id =
'1'").select("data_date").show);
|data_date|
|----------|
|2018-09-23|
Time taken: 30927 ms
scala>
```
So I think,no matter in cow or mor, the data with partition column hasn't
changed, just because fileInputformat has difference .
And, we should still query by data inputformat instead of outputformat which
just used for the partition key.;;;
---
14/Jan/22 00:10;shivnarayan;thanks for the update [~taisenki] .
Can you try querying using
_hoodie_partition_path
and see how that pans out for all different table types and query types. ;;;
---
17/Jan/22 01:37;taisenki;[~shivnarayan]
using query with _hoodie_partition_path :
{code:java}
// MOR & SNAPSHOT QUERY
scala>
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).load(tablePath).where("_hoodie_partition_path
= '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
| 20220117090936761|20220117090936761...| 2|
2018/09/24|8216950c-cf4c-4fc...| 2| z3| 35| v1|2018-09-24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
// MOR & READ_OPTIMIZED QUERY
scala>
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(tablePath).where("_hoodie_partition_path
= '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
| 20220117090936761|20220117090936761...| 2|
2018/09/24|8216950c-cf4c-4fc...| 2| z3| 35| v1|2018/09/24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
// MOR & INCREMENTAL QUERY
scala>
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
20220117090936760L).load(tablePath).where("_hoodie_partition_path =
'2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
| 20220117090936761|20220117090936761...| 2|
2018/09/24|8216950c-cf4c-4fc...| 2| z3| 35| v1|2018-09-24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
{code}
using query with partition column data_date:
{code:java}
// MOR & SNAPSHOT QUERY
scala>
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).load(tablePath).where("data_date
= '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|
id|name|age| ts|data_date|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
scala>
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).load(tablePath).where("data_date
= '2018-09-24'").show
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|
id|name|age| ts|data_date|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
// MOR & READ_OPTIMIZED QUERY
scala>
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(tablePath).where("data_date
= '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
| 20220117090936761|20220117090936761...| 2|
2018/09/24|8216950c-cf4c-4fc...| 2| z3| 35| v1|2018/09/24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
scala>
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(tablePath).where("data_date
= '2018-09-24'").show
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|
id|name|age| ts|data_date|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
// MOR & INCREMENTAL QUERY
scala>
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
20220117090936760L).load(tablePath).where("data_date = '2018-09-24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
| 20220117090936761|20220117090936761...| 2|
2018/09/24|8216950c-cf4c-4fc...| 2| z3| 35| v1|2018-09-24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
scala>
spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
20220117090936760L).load(tablePath).where("data_date = '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|
id|name|age| ts|data_date|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
{code}
So we can see , difference between on each query type with partition column
, and i perfer to incremental query type result format.;;;
---
27/Jan/22 03:45;[email protected];[~shivnarayan] [~taisenki]
i also agree that the result of incremental query is better. According to
this, we have a list to do:
# for cow, the field of `data_date` should return the 'yyyy-MM-dd' original
format. It should be fixed.
# for mor. in snapshot query and read_opitimized query, hudi should
response correctly by 'data_date' using 'yyyy-MM-dd' format. ;;;
---
18/Apr/22 22:46;alexey.kudinkin;We have to rollback the original fix to
this, due to the performance regression it was producing. More details could be
found in HUDI-3902.;;;
---
16/Oct/24 01:56;yihua;[~jonvex] is this already done as part of your
previous fix?;;;
---
16/Oct/24 21:16;yihua;Jon has put up two fixes on reading the original value
of the timestamp partition column (not from the string partition path):
HUDI-5807 ([https://github.com/apache/hudi/pull/11770]), HUDI-8098
([https://github.com/apache/hudi/pull/11895]). So the issue is fixed.;;;
---
16/Oct/24 21:18;yihua;The new file group reader-based and
HadoopFsRelation-based (using new HoodieParquetFileFormat) query logic in Spark
can read the original value of the timestamp partition column properly. The
deprecated relation classes do not support this yet.;;;
---
16/Oct/24 21:24;yihua;We may not want to fix this issue in the deprecated
relation classes which is only used by clustering (to be revisited and fixed).
We should follow up to remove the relation classes so we only maintain the new
reader code path going forward.;;;
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]