[ 
https://issues.apache.org/jira/browse/HUDI-2495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2495:
--------------------------------------
    Description: 
when complex key gen is used and one of the field in record key is a timestamp 
field, row writer path and rdd path gives different record key values. 
GenericRecord path converts timestamp, where as row writer path does not do any 
conversion. 

 

import java.sql.Timestamp
 import spark.implicits._

val df = Seq(
 (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
 (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
 (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
 (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
 ).toDF("typeId","eventTime", "str")

 

df.write.format("hudi").
 option("hoodie.insert.shuffle.parallelism", "2").
 option("hoodie.upsert.shuffle.parallelism", "2").
 option("hoodie.bulkinsert.shuffle.parallelism", "2").
 option("hoodie.datasource.write.precombine.field", "typeId").
 option("hoodie.datasource.write.partitionpath.field", "typeId").
 option("hoodie.datasource.write.recordkey.field", "str,eventTime").
 
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
 option("hoodie.table.name", "hudi_tbl").
 mode(Overwrite).
 save("/tmp/hudi_tbl_trial/")

 

val hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")

hudiDF.createOrReplaceTempView("hudi_sql_tbl")

spark.sql("select _hoodie_record_key, str, eventTime, typeId from 
hudi_sql_tbl").show(false)

 
{code:java}
+----------------------------------+---+-------------------+------+
|_hoodie_record_key                |str|eventTime          |typeId|
+----------------------------------+---+-------------------+------+
|str:abc,eventTime:1417369232000000|abc|2014-11-30 12:40:32|1     |
|str:abc,eventTime:1388635201000000|abc|2014-01-01 23:00:01|1     |
|str:def,eventTime:1462803163000000|def|2016-05-09 10:12:43|2     |
|str:def,eventTime:1483023240000000|def|2016-12-29 09:54:00|2     |
+----------------------------------+---+-------------------+------+
{code}
 

 

// now retry w/ bulk_insert row writer path

df.write.format("hudi").
 option("hoodie.insert.shuffle.parallelism", "2").
 option("hoodie.upsert.shuffle.parallelism", "2").
 option("hoodie.bulkinsert.shuffle.parallelism", "2").
 option("hoodie.datasource.write.precombine.field", "typeId").
 option("hoodie.datasource.write.partitionpath.field", "typeId").
 option("hoodie.datasource.write.recordkey.field", "str,eventTime").
 
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
 option("hoodie.table.name", "hudi_tbl").

"hoodie.datasource.write.operation","bulk_insert").
 mode(Overwrite).
 save("/tmp/hudi_tbl_trial_bulk_insert/")

 

val hudiDF_bulk_insert = 
spark.read.format("hudi").load("/tmp/hudi_tbl_trial_bulk_insert/")

hudiDF_bulk_insert.createOrReplaceTempView("hudi_sql_tbl_bulk_insert")

spark.sql("select _hoodie_record_key, str, eventTime, typeId from 
hudi_sql_tbl_bulk_insert").show(false)
{code:java}
+---------------------------------------+---+-------------------+------+
|_hoodie_record_key                     |str|eventTime          |typeId|
+---------------------------------------+---+-------------------+------+
|str:def,eventTime:2016-05-09 10:12:43.0|def|2016-05-09 10:12:43|2     |
|str:def,eventTime:2016-12-29 09:54:00.0|def|2016-12-29 09:54:00|2     |
|str:abc,eventTime:2014-01-01 23:00:01.0|abc|2014-01-01 23:00:01|1     |
|str:abc,eventTime:2014-11-30 12:40:32.0|abc|2014-11-30 12:40:32|1     |
+---------------------------------------+---+-------------------+------+
{code}

  was:
when complex key gen is used and one of the field in record key is a timestamp 
field, row writer path and rdd path gives different record key values. 

 


import java.sql.Timestamp
import spark.implicits._

val df = Seq(
 (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
 (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
 (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
 (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
).toDF("typeId","eventTime", "str")

 


df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
 option("hoodie.datasource.write.precombine.field", "typeId").
 option("hoodie.datasource.write.partitionpath.field", "typeId").
 option("hoodie.datasource.write.recordkey.field", "str,eventTime").
 
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
 option("hoodie.table.name", "hudi_tbl").
 mode(Overwrite).
 save("/tmp/hudi_tbl_trial/")

 

val hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")

hudiDF.createOrReplaceTempView("hudi_sql_tbl")

spark.sql("select _hoodie_record_key, str, eventTime, typeId from 
hudi_sql_tbl").show(false)

 
{code:java}
+----------------------------------+---+-------------------+------+
|_hoodie_record_key                |str|eventTime          |typeId|
+----------------------------------+---+-------------------+------+
|str:abc,eventTime:1417369232000000|abc|2014-11-30 12:40:32|1     |
|str:abc,eventTime:1388635201000000|abc|2014-01-01 23:00:01|1     |
|str:def,eventTime:1462803163000000|def|2016-05-09 10:12:43|2     |
|str:def,eventTime:1483023240000000|def|2016-12-29 09:54:00|2     |
+----------------------------------+---+-------------------+------+
{code}
 

 

// now retry w/ bulk_insert row writer path


df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
 option("hoodie.datasource.write.precombine.field", "typeId").
 option("hoodie.datasource.write.partitionpath.field", "typeId").
 option("hoodie.datasource.write.recordkey.field", "str,eventTime").
 
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
 option("hoodie.table.name", "hudi_tbl").

"hoodie.datasource.write.operation","bulk_insert").
 mode(Overwrite).
 save("/tmp/hudi_tbl_trial_bulk_insert/")

 

val hudiDF_bulk_insert = 
spark.read.format("hudi").load("/tmp/hudi_tbl_trial_bulk_insert/")

hudiDF_bulk_insert.createOrReplaceTempView("hudi_sql_tbl_bulk_insert")

spark.sql("select _hoodie_record_key, str, eventTime, typeId from 
hudi_sql_tbl_bulk_insert").show(false)
{code:java}
+---------------------------------------+---+-------------------+------+
|_hoodie_record_key                     |str|eventTime          |typeId|
+---------------------------------------+---+-------------------+------+
|str:def,eventTime:2016-05-09 10:12:43.0|def|2016-05-09 10:12:43|2     |
|str:def,eventTime:2016-12-29 09:54:00.0|def|2016-12-29 09:54:00|2     |
|str:abc,eventTime:2014-01-01 23:00:01.0|abc|2014-01-01 23:00:01|1     |
|str:abc,eventTime:2014-11-30 12:40:32.0|abc|2014-11-30 12:40:32|1     |
+---------------------------------------+---+-------------------+------+
{code}


> Difference in behavior between GenericRecord based key gen and Row based key 
> gen 
> ---------------------------------------------------------------------------------
>
>                 Key: HUDI-2495
>                 URL: https://issues.apache.org/jira/browse/HUDI-2495
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Major
>              Labels: sev:critical
>
> when complex key gen is used and one of the field in record key is a 
> timestamp field, row writer path and rdd path gives different record key 
> values. GenericRecord path converts timestamp, where as row writer path does 
> not do any conversion. 
>  
> import java.sql.Timestamp
>  import spark.implicits._
> val df = Seq(
>  (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"),
>  (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"),
>  (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"),
>  (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def")
>  ).toDF("typeId","eventTime", "str")
>  
> df.write.format("hudi").
>  option("hoodie.insert.shuffle.parallelism", "2").
>  option("hoodie.upsert.shuffle.parallelism", "2").
>  option("hoodie.bulkinsert.shuffle.parallelism", "2").
>  option("hoodie.datasource.write.precombine.field", "typeId").
>  option("hoodie.datasource.write.partitionpath.field", "typeId").
>  option("hoodie.datasource.write.recordkey.field", "str,eventTime").
>  
> option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
>  option("hoodie.table.name", "hudi_tbl").
>  mode(Overwrite).
>  save("/tmp/hudi_tbl_trial/")
>  
> val hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")
> hudiDF.createOrReplaceTempView("hudi_sql_tbl")
> spark.sql("select _hoodie_record_key, str, eventTime, typeId from 
> hudi_sql_tbl").show(false)
>  
> {code:java}
> +----------------------------------+---+-------------------+------+
> |_hoodie_record_key                |str|eventTime          |typeId|
> +----------------------------------+---+-------------------+------+
> |str:abc,eventTime:1417369232000000|abc|2014-11-30 12:40:32|1     |
> |str:abc,eventTime:1388635201000000|abc|2014-01-01 23:00:01|1     |
> |str:def,eventTime:1462803163000000|def|2016-05-09 10:12:43|2     |
> |str:def,eventTime:1483023240000000|def|2016-12-29 09:54:00|2     |
> +----------------------------------+---+-------------------+------+
> {code}
>  
>  
> // now retry w/ bulk_insert row writer path
> df.write.format("hudi").
>  option("hoodie.insert.shuffle.parallelism", "2").
>  option("hoodie.upsert.shuffle.parallelism", "2").
>  option("hoodie.bulkinsert.shuffle.parallelism", "2").
>  option("hoodie.datasource.write.precombine.field", "typeId").
>  option("hoodie.datasource.write.partitionpath.field", "typeId").
>  option("hoodie.datasource.write.recordkey.field", "str,eventTime").
>  
> option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").
>  option("hoodie.table.name", "hudi_tbl").
> "hoodie.datasource.write.operation","bulk_insert").
>  mode(Overwrite).
>  save("/tmp/hudi_tbl_trial_bulk_insert/")
>  
> val hudiDF_bulk_insert = 
> spark.read.format("hudi").load("/tmp/hudi_tbl_trial_bulk_insert/")
> hudiDF_bulk_insert.createOrReplaceTempView("hudi_sql_tbl_bulk_insert")
> spark.sql("select _hoodie_record_key, str, eventTime, typeId from 
> hudi_sql_tbl_bulk_insert").show(false)
> {code:java}
> +---------------------------------------+---+-------------------+------+
> |_hoodie_record_key                     |str|eventTime          |typeId|
> +---------------------------------------+---+-------------------+------+
> |str:def,eventTime:2016-05-09 10:12:43.0|def|2016-05-09 10:12:43|2     |
> |str:def,eventTime:2016-12-29 09:54:00.0|def|2016-12-29 09:54:00|2     |
> |str:abc,eventTime:2014-01-01 23:00:01.0|abc|2014-01-01 23:00:01|1     |
> |str:abc,eventTime:2014-11-30 12:40:32.0|abc|2014-11-30 12:40:32|1     |
> +---------------------------------------+---+-------------------+------+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to