[ 
https://issues.apache.org/jira/browse/HUDI-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Y Ethan Guo updated HUDI-8257:
------------------------------
    Fix Version/s: 1.1.0
                       (was: 1.0.0)

> MIT doesn't handle some spark types correctly
> ---------------------------------------------
>
>                 Key: HUDI-8257
>                 URL: https://issues.apache.org/jira/browse/HUDI-8257
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: spark, spark-sql
>            Reporter: Jonathan Vexler
>            Priority: Critical
>             Fix For: 1.1.0
>
>
> Here is a test
> {code:java}
> test("Test MergeInto For PreCombineField With Different Types") {
>   spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
>   withRecordType()(withTempDir { tmp =>
>     spark.sql("set hoodie.payload.combined.schema.validate = true")
>     Seq("mor").foreach { tableType =>
>       val tableName1 = generateTableName
>       spark.sql(
>         s"""
>            | create table $tableName1 (
>            |  id int,
>            |  name string,
>            |  price double,
>            |  v long,
>            |  dt string
>            | ) using hudi
>            | tblproperties (
>            |  type = '$tableType',
>            |  primaryKey = 'id',
>            |  preCombineField = 'v',
>            |  hoodie.compaction.payload.class = 
> 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
>            | )
>            | partitioned by(dt)
>            | location '${tmp.getCanonicalPath}/$tableName1'
>        """.stripMargin)
>       // Insert data; pre-combine field value type is long.
>       spark.sql(
>         s"""
>            | merge into $tableName1 as t0
>            | using (
>            |  select 1 as id, 'a1' as name, 10 as price, 1001L as v, 
> '2021-03-21' as dt
>            | ) as s0
>            | on t0.id = s0.id
>            | when not matched and s0.id % 2 = 1 then insert *
>        """.stripMargin
>       )
>       checkAnswer(s"select id,name,price,dt,v from $tableName1")(
>         Seq(1, "a1", 10, "2021-03-21", 1001)
>       )
>       // Insert data; pre-combine field value type is short.
>     spark.sql(
>       s"""
>          | merge into $tableName1 as t0
>          | using (
>          |  select 1 as id, 'a1' as name, 12 as price, 1002S as v, 
> '2021-03-21' as dt
>          | ) as s0
>          | on t0.id = s0.id
>          | when matched then update set
>          | id = s0.id, name = s0.name, price = s0.price, v = s0.v, dt = s0.dt
>          | when not matched then insert *
>      """.stripMargin
>     )
>     }
>   })
> }{code}
> If you run this it will fail. This is because when we convert from spark to 
> avro, short types just become integers. Then, in the expression payload we 
> convert back from avro to spark to evaluate the conditions and assignments. 
> However, we only have the avro schema so we use :
> {code:java}
> private def getAvroDeserializerFor(schema: Schema) = {
>   avroDeserializerCache.get()
>     .get(schema, new Function[Schema, HoodieAvroDeserializer] {
>       override def apply(t: Schema): HoodieAvroDeserializer =
>         sparkAdapter.createAvroDeserializer(schema, 
> convertAvroSchemaToStructType(schema))
>     })
> } {code}
> We are getting the spark schema from the avro schema, so the field will now 
> be an int in the spark schema. Then, when we try to evaluate the assignment 
> it will fail.
> This failure occurs because when it was doing analysis resolution earlier, 
> spark sees that we are trying to assign a short to a long. It adds 
> Cast(short->long) in the assignment expression. When we try to evaluate that 
> assignment, the field type of the data is int. It tries to do the cast, but 
> since the cast is from short to long, the int is casted as a short. This 
> fails because it is an illegal operation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to