[
https://issues.apache.org/jira/browse/HUDI-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Well Tang updated HUDI-3184:
----------------------------
Description:
{*}Problem overview{*}:
Steps to reproduce the behavior:
①The spark engine is used to write data into the hoodie table(PS: There are
timestamp type columns in the dataset field).
②Use the Flink engine to read the hoodie table written in step 1.
*Expected behavior*
Caused by: java.lang.IllegalArgumentException: Avro does not support TIMESTAMP
type with precision: 6, it only supports precision less than 3.
at
org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:221)
~...
at
org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:263)
~...
at
org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:169)
~...
at
org.apache.hudi.table.HoodieTableFactory.inferAvroSchema(HoodieTableFactory.java:239)
~...
at
org.apache.hudi.table.HoodieTableFactory.setupConfOptions(HoodieTableFactory.java:155)
~...
at
org.apache.hudi.table.HoodieTableFactory.createDynamicTableSource(HoodieTableFactory.java:65)
~...
*Environment Description*
Hudi version : 0.11.0-SNAPSHOT
Spark version : 3.1.2
Flink version : 1.13.1
Hive version : None
Hadoop version : 2.9.2
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) : None
*Additional context*
We are using hoodie as a data lake to deliver projects to customers. We found
such application scenarios: write data to the hoodie table through the spark
engine, and then read data from the hoodie table through the finlk engine.
It should be noted that the above exception will be caused by how to write to
the column containing the timestamp in the dataset.
In order to simplify the description of the problem, we summarize the problem
into the following steps:
【step-1】Mock data:
{code:java}
/home/deploy/spark-3.1.2-bin-hadoop2.7/bin/spark-shell \
--driver-class-path /home/workflow/apache-hive-2.3.8-bin/conf/ \
--master spark://2-120:7077 \
--executor-memory 4g \
--driver-memory 4g \
--num-executors 4 \
--total-executor-cores 4 \
--name test \
--jars
/home/deploy/spark-3.1.2-bin-hadoop2.7/jars/hudi-spark3-bundle_2.12-0.11.0-SNAPSHOT.jar,/home/deploy/spark-3.1.2-bin-hadoop2.7/jars/spark-avro_2.12-3.1.2.jar
\
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED \
--conf spark.sql.hive.convertMetastoreParquet=false {code}
{code:java}
val df = spark.sql("select 1 as id, 'A' as name, current_timestamp as dt")
df.write.format("hudi").
option("hoodie.datasource.write.recordkey.field", "id").
option("hoodie.datasource.write.precombine.field", "id").
option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.table.name", "timestamp_table").
mode("append").
save("/hudi/suite/data_type_timestamp_table")
spark.read.format("hudi").load("/hudi/suite/data_type_timestamp_table").show(false)
{code}
【step-2】Consumption data through flink:
{code:java}
bin/sql-client.sh embedded -j lib/hudi-flink-bundle_2.12-0.11.0-SNAPSHOT.jar
{code}
{code:java}
create table data_type_timestamp_table (
`id` INT,
`name` STRING,
`dt` TIMESTAMP(6)
) with (
'connector' = 'hudi',
'hoodie.table.name' = 'data_type_timestamp_table',
'read.streaming.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'id',
'path' = '/hudi/suite/data_type_timestamp_table',
'read.streaming.check-interval' = '10',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id'
);
select * from data_type_timestamp_table; {code}
As shown below:
!1.png!
If we changge timestamp (6) to timestamp (3),the result is as follows:
!2.png!
The data can be found here, but the display is incorrect!
After checking It is found in the Hoodie directory that the spark write
timestamp type is timestamp micros:
!3.png!
However, the timestamp type of hook reading and writing Hoodie data is
timestamp-millis!Therefore, it is problematic for us to read and write
timestamp types through Spark and Flink computing engines. We hope that
hudi-flink module needs to support timestamp micros and cannot lose time
accuracy.
was:
{*}Problem overview{*}:
Steps to reproduce the behavior:
①The spark engine is used to write data into the hoodie table(PS: There are
timestamp type columns in the dataset field).
②Use the Flink engine to read the hoodie table written in step 1.
*Expected behavior*
Caused by: java.lang.IllegalArgumentException: Avro does not support TIMESTAMP
type with precision: 6, it only supports precision less than 3.
at
org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:221)
~...
at
org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:263)
~...
at
org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:169)
~...
at
org.apache.hudi.table.HoodieTableFactory.inferAvroSchema(HoodieTableFactory.java:239)
~...
at
org.apache.hudi.table.HoodieTableFactory.setupConfOptions(HoodieTableFactory.java:155)
~...
at
org.apache.hudi.table.HoodieTableFactory.createDynamicTableSource(HoodieTableFactory.java:65)
~...
*Environment Description*
Hudi version : 0.11.0-SNAPSHOT
Spark version : 3.1.2
Flink version : 1.13.1
Hive version : None
Hadoop version : 2.9.2
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) : None
*Additional context*
We are using hoodie as a data lake to deliver projects to customers. We found
such application scenarios: write data to the hoodie table through the spark
engine, and then read data from the hoodie table through the finlk engine.
It should be noted that the above exception will be caused by how to write to
the column containing the timestamp in the dataset.
In order to simplify the description of the problem, we summarize the problem
into the following steps:
【step-1】Mock data:
{code:java}
/home/deploy/spark-3.1.2-bin-hadoop2.7/bin/spark-shell \
--driver-class-path /home/workflow/apache-hive-2.3.8-bin/conf/ \
--master spark://2-120:7077 \
--executor-memory 4g \
--driver-memory 4g \
--num-executors 4 \
--total-executor-cores 4 \
--name test \
--jars
/home/deploy/spark-3.1.2-bin-hadoop2.7/jars/hudi-spark3-bundle_2.12-0.11.0-SNAPSHOT.jar,/home/deploy/spark-3.1.2-bin-hadoop2.7/jars/spark-avro_2.12-3.1.2.jar
\
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED \
--conf spark.sql.hive.convertMetastoreParquet=false {code}
{code:java}
val df = spark.sql("select 1 as id, 'A' as name, current_timestamp as dt")
df.write.format("hudi").
option("hoodie.datasource.write.recordkey.field", "id").
option("hoodie.datasource.write.precombine.field", "id").
option("hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.table.name", "timestamp_table").
mode("append").
save("/hudi/suite/data_type_timestamp_table")
spark.read.format("hudi").load("/hudi/suite/data_type_timestamp_table").show(false)
{code}
【step-2】Consumption data through flink:
{code:java}
bin/sql-client.sh embedded -j lib/hudi-flink-bundle_2.12-0.11.0-SNAPSHOT.jar
{code}
{code:java}
create table data_type_timestamp_table (
`id` INT,
`name` STRING,
`dt` TIMESTAMP(6)
) with (
'connector' = 'hudi',
'hoodie.table.name' = 'data_type_timestamp_table',
'read.streaming.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'id',
'path' = '/hudi/suite/data_type_timestamp_table',
'read.streaming.check-interval' = '10',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id'
);
select * from data_type_timestamp_table; {code}
As shown below:
!1.png!
If we changge timestamp (6) to timestamp (3),the result is as follows:
!2.png!
The data can be found here, but the display is incorrect!
After checking It is found in the Hoodie directory that the spark write
timestamp type is timestamp micros:
!3.png!
However, the timestamp type of hook reading and writing Hoodie data is
timestamp-millis!Therefore, it is problematic for us to read and write
timestamp types through Spark and Flink computing engines. We hope that
hudi-flink module needs to support timestamp micros and cannot lose time
accuracy.
Remaining Estimate: 5h (was: 120h)
Original Estimate: 5h (was: 120h)
> hudi-flink support timestamp-micros
> -----------------------------------
>
> Key: HUDI-3184
> URL: https://issues.apache.org/jira/browse/HUDI-3184
> Project: Apache Hudi
> Issue Type: Improvement
> Components: Flink Integration
> Reporter: Well Tang
> Assignee: Well Tang
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.11.0
>
> Attachments: 1.png, 2.png, 3.png
>
> Original Estimate: 5h
> Remaining Estimate: 5h
>
> {*}Problem overview{*}:
> Steps to reproduce the behavior:
> ①The spark engine is used to write data into the hoodie table(PS: There are
> timestamp type columns in the dataset field).
> ②Use the Flink engine to read the hoodie table written in step 1.
> *Expected behavior*
> Caused by: java.lang.IllegalArgumentException: Avro does not support
> TIMESTAMP type with precision: 6, it only supports precision less than 3.
> at
> org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:221)
> ~...
> at
> org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:263)
> ~...
> at
> org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:169)
> ~...
> at
> org.apache.hudi.table.HoodieTableFactory.inferAvroSchema(HoodieTableFactory.java:239)
> ~...
> at
> org.apache.hudi.table.HoodieTableFactory.setupConfOptions(HoodieTableFactory.java:155)
> ~...
> at
> org.apache.hudi.table.HoodieTableFactory.createDynamicTableSource(HoodieTableFactory.java:65)
> ~...
> *Environment Description*
> Hudi version : 0.11.0-SNAPSHOT
> Spark version : 3.1.2
> Flink version : 1.13.1
> Hive version : None
> Hadoop version : 2.9.2
> Storage (HDFS/S3/GCS..) : HDFS
> Running on Docker? (yes/no) : None
> *Additional context*
> We are using hoodie as a data lake to deliver projects to customers. We found
> such application scenarios: write data to the hoodie table through the spark
> engine, and then read data from the hoodie table through the finlk engine.
> It should be noted that the above exception will be caused by how to write to
> the column containing the timestamp in the dataset.
> In order to simplify the description of the problem, we summarize the problem
> into the following steps:
> 【step-1】Mock data:
> {code:java}
> /home/deploy/spark-3.1.2-bin-hadoop2.7/bin/spark-shell \
> --driver-class-path /home/workflow/apache-hive-2.3.8-bin/conf/ \
> --master spark://2-120:7077 \
> --executor-memory 4g \
> --driver-memory 4g \
> --num-executors 4 \
> --total-executor-cores 4 \
> --name test \
> --jars
> /home/deploy/spark-3.1.2-bin-hadoop2.7/jars/hudi-spark3-bundle_2.12-0.11.0-SNAPSHOT.jar,/home/deploy/spark-3.1.2-bin-hadoop2.7/jars/spark-avro_2.12-3.1.2.jar
> \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED \
> --conf spark.sql.hive.convertMetastoreParquet=false {code}
> {code:java}
> val df = spark.sql("select 1 as id, 'A' as name, current_timestamp as dt")
> df.write.format("hudi").
> option("hoodie.datasource.write.recordkey.field", "id").
> option("hoodie.datasource.write.precombine.field", "id").
> option("hoodie.datasource.write.keygenerator.class",
> "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
> option("hoodie.upsert.shuffle.parallelism", "2").
> option("hoodie.table.name", "timestamp_table").
> mode("append").
> save("/hudi/suite/data_type_timestamp_table")
> spark.read.format("hudi").load("/hudi/suite/data_type_timestamp_table").show(false)
> {code}
> 【step-2】Consumption data through flink:
> {code:java}
> bin/sql-client.sh embedded -j lib/hudi-flink-bundle_2.12-0.11.0-SNAPSHOT.jar
> {code}
> {code:java}
> create table data_type_timestamp_table (
> `id` INT,
> `name` STRING,
> `dt` TIMESTAMP(6)
> ) with (
> 'connector' = 'hudi',
> 'hoodie.table.name' = 'data_type_timestamp_table',
> 'read.streaming.enabled' = 'true',
> 'hoodie.datasource.write.recordkey.field' = 'id',
> 'path' = '/hudi/suite/data_type_timestamp_table',
> 'read.streaming.check-interval' = '10',
> 'table.type' = 'COPY_ON_WRITE',
> 'write.precombine.field' = 'id'
> );
> select * from data_type_timestamp_table; {code}
> As shown below:
> !1.png!
> If we changge timestamp (6) to timestamp (3),the result is as follows:
> !2.png!
> The data can be found here, but the display is incorrect!
> After checking It is found in the Hoodie directory that the spark write
> timestamp type is timestamp micros:
> !3.png!
> However, the timestamp type of hook reading and writing Hoodie data is
> timestamp-millis!Therefore, it is problematic for us to read and write
> timestamp types through Spark and Flink computing engines. We hope that
> hudi-flink module needs to support timestamp micros and cannot lose time
> accuracy.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)