[
https://issues.apache.org/jira/browse/HUDI-2495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-2495:
--------------------------------------
Description:
when complex key gen is used and one of the field in record key is a timestamp
field, row writer path and rdd path gives different record key values.
GenericRecord path converts timestamp, where as row writer path does not do any
conversion.
import java.sql.Timestamp
import spark.implicits._
val df = Seq(
(1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
(1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
(2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
).toDF("typeId","eventTime", "str")
df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.datasource.write.precombine.field", "typeId").
option("hoodie.datasource.write.partitionpath.field", "typeId").
option("hoodie.datasource.write.recordkey.field", "str,eventTime").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
option("hoodie.table.name", "hudi_tbl").
mode(Overwrite).
save("/tmp/hudi_tbl_trial/")
val hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")
hudiDF.createOrReplaceTempView("hudi_sql_tbl")
spark.sql("select _hoodie_record_key, str, eventTime, typeId from
hudi_sql_tbl").show(false)
{code:java}
+----------------------------------+---+-------------------+------+
|_hoodie_record_key |str|eventTime |typeId|
+----------------------------------+---+-------------------+------+
|str:abc,eventTime:1417369232000000|abc|2014-11-30 12:40:32|1 |
|str:abc,eventTime:1388635201000000|abc|2014-01-01 23:00:01|1 |
|str:def,eventTime:1462803163000000|def|2016-05-09 10:12:43|2 |
|str:def,eventTime:1483023240000000|def|2016-12-29 09:54:00|2 |
+----------------------------------+---+-------------------+------+
{code}
// now retry w/ bulk_insert row writer path
df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.datasource.write.precombine.field", "typeId").
option("hoodie.datasource.write.partitionpath.field", "typeId").
option("hoodie.datasource.write.recordkey.field", "str,eventTime").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
option("hoodie.table.name", "hudi_tbl").
"hoodie.datasource.write.operation","bulk_insert").
mode(Overwrite).
save("/tmp/hudi_tbl_trial_bulk_insert/")
val hudiDF_bulk_insert =
spark.read.format("hudi").load("/tmp/hudi_tbl_trial_bulk_insert/")
hudiDF_bulk_insert.createOrReplaceTempView("hudi_sql_tbl_bulk_insert")
spark.sql("select _hoodie_record_key, str, eventTime, typeId from
hudi_sql_tbl_bulk_insert").show(false)
{code:java}
+---------------------------------------+---+-------------------+------+
|_hoodie_record_key |str|eventTime |typeId|
+---------------------------------------+---+-------------------+------+
|str:def,eventTime:2016-05-09 10:12:43.0|def|2016-05-09 10:12:43|2 |
|str:def,eventTime:2016-12-29 09:54:00.0|def|2016-12-29 09:54:00|2 |
|str:abc,eventTime:2014-01-01 23:00:01.0|abc|2014-01-01 23:00:01|1 |
|str:abc,eventTime:2014-11-30 12:40:32.0|abc|2014-11-30 12:40:32|1 |
+---------------------------------------+---+-------------------+------+
{code}
was:
when complex key gen is used and one of the field in record key is a timestamp
field, row writer path and rdd path gives different record key values.
import java.sql.Timestamp
import spark.implicits._
val df = Seq(
(1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
(1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
(2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
).toDF("typeId","eventTime", "str")
df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.datasource.write.precombine.field", "typeId").
option("hoodie.datasource.write.partitionpath.field", "typeId").
option("hoodie.datasource.write.recordkey.field", "str,eventTime").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
option("hoodie.table.name", "hudi_tbl").
mode(Overwrite).
save("/tmp/hudi_tbl_trial/")
val hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")
hudiDF.createOrReplaceTempView("hudi_sql_tbl")
spark.sql("select _hoodie_record_key, str, eventTime, typeId from
hudi_sql_tbl").show(false)
{code:java}
+----------------------------------+---+-------------------+------+
|_hoodie_record_key |str|eventTime |typeId|
+----------------------------------+---+-------------------+------+
|str:abc,eventTime:1417369232000000|abc|2014-11-30 12:40:32|1 |
|str:abc,eventTime:1388635201000000|abc|2014-01-01 23:00:01|1 |
|str:def,eventTime:1462803163000000|def|2016-05-09 10:12:43|2 |
|str:def,eventTime:1483023240000000|def|2016-12-29 09:54:00|2 |
+----------------------------------+---+-------------------+------+
{code}
// now retry w/ bulk_insert row writer path
df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.datasource.write.precombine.field", "typeId").
option("hoodie.datasource.write.partitionpath.field", "typeId").
option("hoodie.datasource.write.recordkey.field", "str,eventTime").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
option("hoodie.table.name", "hudi_tbl").
"hoodie.datasource.write.operation","bulk_insert").
mode(Overwrite).
save("/tmp/hudi_tbl_trial_bulk_insert/")
val hudiDF_bulk_insert =
spark.read.format("hudi").load("/tmp/hudi_tbl_trial_bulk_insert/")
hudiDF_bulk_insert.createOrReplaceTempView("hudi_sql_tbl_bulk_insert")
spark.sql("select _hoodie_record_key, str, eventTime, typeId from
hudi_sql_tbl_bulk_insert").show(false)
{code:java}
+---------------------------------------+---+-------------------+------+
|_hoodie_record_key |str|eventTime |typeId|
+---------------------------------------+---+-------------------+------+
|str:def,eventTime:2016-05-09 10:12:43.0|def|2016-05-09 10:12:43|2 |
|str:def,eventTime:2016-12-29 09:54:00.0|def|2016-12-29 09:54:00|2 |
|str:abc,eventTime:2014-01-01 23:00:01.0|abc|2014-01-01 23:00:01|1 |
|str:abc,eventTime:2014-11-30 12:40:32.0|abc|2014-11-30 12:40:32|1 |
+---------------------------------------+---+-------------------+------+
{code}
> Difference in behavior between GenericRecord based key gen and Row based key
> gen
> ---------------------------------------------------------------------------------
>
> Key: HUDI-2495
> URL: https://issues.apache.org/jira/browse/HUDI-2495
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Major
> Labels: sev:critical
>
> when complex key gen is used and one of the field in record key is a
> timestamp field, row writer path and rdd path gives different record key
> values. GenericRecord path converts timestamp, where as row writer path does
> not do any conversion.
>
> import java.sql.Timestamp
> import spark.implicits._
> val df = Seq(
> (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
> (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
> (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
> (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
> ).toDF("typeId","eventTime", "str")
>
> df.write.format("hudi").
> option("hoodie.insert.shuffle.parallelism", "2").
> option("hoodie.upsert.shuffle.parallelism", "2").
> option("hoodie.bulkinsert.shuffle.parallelism", "2").
> option("hoodie.datasource.write.precombine.field", "typeId").
> option("hoodie.datasource.write.partitionpath.field", "typeId").
> option("hoodie.datasource.write.recordkey.field", "str,eventTime").
>
> option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
> option("hoodie.table.name", "hudi_tbl").
> mode(Overwrite).
> save("/tmp/hudi_tbl_trial/")
>
> val hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")
> hudiDF.createOrReplaceTempView("hudi_sql_tbl")
> spark.sql("select _hoodie_record_key, str, eventTime, typeId from
> hudi_sql_tbl").show(false)
>
> {code:java}
> +----------------------------------+---+-------------------+------+
> |_hoodie_record_key |str|eventTime |typeId|
> +----------------------------------+---+-------------------+------+
> |str:abc,eventTime:1417369232000000|abc|2014-11-30 12:40:32|1 |
> |str:abc,eventTime:1388635201000000|abc|2014-01-01 23:00:01|1 |
> |str:def,eventTime:1462803163000000|def|2016-05-09 10:12:43|2 |
> |str:def,eventTime:1483023240000000|def|2016-12-29 09:54:00|2 |
> +----------------------------------+---+-------------------+------+
> {code}
>
>
> // now retry w/ bulk_insert row writer path
> df.write.format("hudi").
> option("hoodie.insert.shuffle.parallelism", "2").
> option("hoodie.upsert.shuffle.parallelism", "2").
> option("hoodie.bulkinsert.shuffle.parallelism", "2").
> option("hoodie.datasource.write.precombine.field", "typeId").
> option("hoodie.datasource.write.partitionpath.field", "typeId").
> option("hoodie.datasource.write.recordkey.field", "str,eventTime").
>
> option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
> option("hoodie.table.name", "hudi_tbl").
> "hoodie.datasource.write.operation","bulk_insert").
> mode(Overwrite).
> save("/tmp/hudi_tbl_trial_bulk_insert/")
>
> val hudiDF_bulk_insert =
> spark.read.format("hudi").load("/tmp/hudi_tbl_trial_bulk_insert/")
> hudiDF_bulk_insert.createOrReplaceTempView("hudi_sql_tbl_bulk_insert")
> spark.sql("select _hoodie_record_key, str, eventTime, typeId from
> hudi_sql_tbl_bulk_insert").show(false)
> {code:java}
> +---------------------------------------+---+-------------------+------+
> |_hoodie_record_key |str|eventTime |typeId|
> +---------------------------------------+---+-------------------+------+
> |str:def,eventTime:2016-05-09 10:12:43.0|def|2016-05-09 10:12:43|2 |
> |str:def,eventTime:2016-12-29 09:54:00.0|def|2016-12-29 09:54:00|2 |
> |str:abc,eventTime:2014-01-01 23:00:01.0|abc|2014-01-01 23:00:01|1 |
> |str:abc,eventTime:2014-11-30 12:40:32.0|abc|2014-11-30 12:40:32|1 |
> +---------------------------------------+---+-------------------+------+
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)