Jonathan Vexler created HUDI-8257:
-------------------------------------

             Summary: MIT doesn't handle some spark types correctly
                 Key: HUDI-8257
                 URL: https://issues.apache.org/jira/browse/HUDI-8257
             Project: Apache Hudi
          Issue Type: Bug
          Components: spark, spark-sql
            Reporter: Jonathan Vexler


Here is a test
{code:java}
test("Test MergeInto For PreCombineField With Different Types") {
  spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
  withRecordType()(withTempDir { tmp =>
    spark.sql("set hoodie.payload.combined.schema.validate = true")
    Seq("mor").foreach { tableType =>
      val tableName1 = generateTableName
      spark.sql(
        s"""
           | create table $tableName1 (
           |  id int,
           |  name string,
           |  price double,
           |  v long,
           |  dt string
           | ) using hudi
           | tblproperties (
           |  type = '$tableType',
           |  primaryKey = 'id',
           |  preCombineField = 'v',
           |  hoodie.compaction.payload.class = 
'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
           | )
           | partitioned by(dt)
           | location '${tmp.getCanonicalPath}/$tableName1'
       """.stripMargin)

      // Insert data; pre-combine field value type is long.
      spark.sql(
        s"""
           | merge into $tableName1 as t0
           | using (
           |  select 1 as id, 'a1' as name, 10 as price, 1001L as v, 
'2021-03-21' as dt
           | ) as s0
           | on t0.id = s0.id
           | when not matched and s0.id % 2 = 1 then insert *
       """.stripMargin
      )
      checkAnswer(s"select id,name,price,dt,v from $tableName1")(
        Seq(1, "a1", 10, "2021-03-21", 1001)
      )

      // Insert data; pre-combine field value type is short.
    spark.sql(
      s"""
         | merge into $tableName1 as t0
         | using (
         |  select 1 as id, 'a1' as name, 12 as price, 1002S as v, '2021-03-21' 
as dt
         | ) as s0
         | on t0.id = s0.id
         | when matched then update set
         | id = s0.id, name = s0.name, price = s0.price, v = s0.v, dt = s0.dt
         | when not matched then insert *
     """.stripMargin
    )
    }
  })
}{code}
If you run this it will fail. This is because when we convert from spark to 
avro, short types just become integers. Then, in the expression payload we 
convert back from avro to spark to evaluate the conditions and assignments. 
However, we only have the avro schema so we use :
{code:java}
private def getAvroDeserializerFor(schema: Schema) = {
  avroDeserializerCache.get()
    .get(schema, new Function[Schema, HoodieAvroDeserializer] {
      override def apply(t: Schema): HoodieAvroDeserializer =
        sparkAdapter.createAvroDeserializer(schema, 
convertAvroSchemaToStructType(schema))
    })
} {code}
We are getting the spark schema from the avro schema, so the field will now be 
an int in the spark schema. Then, when we try to evaluate the assignment it 
will fail.
This failure occurs because during planning, spark sees that we are trying to 
assign a short to a long. It adds Cast(short->long) in the assignment 
expression. When we try to evaluate that assignment, the field type of the data 
is int. It tries to do the cast, but since the cast is from short to long, the 
int is casted as a short. This fails because it is an illegal operation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to