[
https://issues.apache.org/jira/browse/HUDI-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Y Ethan Guo updated HUDI-8257:
------------------------------
Priority: Critical (was: Major)
> MIT doesn't handle some spark types correctly
> ---------------------------------------------
>
> Key: HUDI-8257
> URL: https://issues.apache.org/jira/browse/HUDI-8257
> Project: Apache Hudi
> Issue Type: Bug
> Components: spark, spark-sql
> Reporter: Jonathan Vexler
> Priority: Critical
> Fix For: 1.0.0
>
>
> Here is a test
> {code:java}
> test("Test MergeInto For PreCombineField With Different Types") {
> spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
> withRecordType()(withTempDir { tmp =>
> spark.sql("set hoodie.payload.combined.schema.validate = true")
> Seq("mor").foreach { tableType =>
> val tableName1 = generateTableName
> spark.sql(
> s"""
> | create table $tableName1 (
> | id int,
> | name string,
> | price double,
> | v long,
> | dt string
> | ) using hudi
> | tblproperties (
> | type = '$tableType',
> | primaryKey = 'id',
> | preCombineField = 'v',
> | hoodie.compaction.payload.class =
> 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
> | )
> | partitioned by(dt)
> | location '${tmp.getCanonicalPath}/$tableName1'
> """.stripMargin)
> // Insert data; pre-combine field value type is long.
> spark.sql(
> s"""
> | merge into $tableName1 as t0
> | using (
> | select 1 as id, 'a1' as name, 10 as price, 1001L as v,
> '2021-03-21' as dt
> | ) as s0
> | on t0.id = s0.id
> | when not matched and s0.id % 2 = 1 then insert *
> """.stripMargin
> )
> checkAnswer(s"select id,name,price,dt,v from $tableName1")(
> Seq(1, "a1", 10, "2021-03-21", 1001)
> )
> // Insert data; pre-combine field value type is short.
> spark.sql(
> s"""
> | merge into $tableName1 as t0
> | using (
> | select 1 as id, 'a1' as name, 12 as price, 1002S as v,
> '2021-03-21' as dt
> | ) as s0
> | on t0.id = s0.id
> | when matched then update set
> | id = s0.id, name = s0.name, price = s0.price, v = s0.v, dt = s0.dt
> | when not matched then insert *
> """.stripMargin
> )
> }
> })
> }{code}
> If you run this it will fail. This is because when we convert from spark to
> avro, short types just become integers. Then, in the expression payload we
> convert back from avro to spark to evaluate the conditions and assignments.
> However, we only have the avro schema so we use :
> {code:java}
> private def getAvroDeserializerFor(schema: Schema) = {
> avroDeserializerCache.get()
> .get(schema, new Function[Schema, HoodieAvroDeserializer] {
> override def apply(t: Schema): HoodieAvroDeserializer =
> sparkAdapter.createAvroDeserializer(schema,
> convertAvroSchemaToStructType(schema))
> })
> } {code}
> We are getting the spark schema from the avro schema, so the field will now
> be an int in the spark schema. Then, when we try to evaluate the assignment
> it will fail.
> This failure occurs because when it was doing analysis resolution earlier,
> spark sees that we are trying to assign a short to a long. It adds
> Cast(short->long) in the assignment expression. When we try to evaluate that
> assignment, the field type of the data is int. It tries to do the cast, but
> since the cast is from short to long, the int is casted as a short. This
> fails because it is an illegal operation
--
This message was sent by Atlassian Jira
(v8.20.10#820010)