[
https://issues.apache.org/jira/browse/HUDI-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jonathan Vexler updated HUDI-8257:
----------------------------------
Description:
Here is a test
{code:java}
test("Test MergeInto For PreCombineField With Different Types") {
spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
Seq("mor").foreach { tableType =>
val tableName1 = generateTableName
spark.sql(
s"""
| create table $tableName1 (
| id int,
| name string,
| price double,
| v long,
| dt string
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'v',
| hoodie.compaction.payload.class =
'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
| )
| partitioned by(dt)
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)
// Insert data; pre-combine field value type is long.
spark.sql(
s"""
| merge into $tableName1 as t0
| using (
| select 1 as id, 'a1' as name, 10 as price, 1001L as v,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
""".stripMargin
)
checkAnswer(s"select id,name,price,dt,v from $tableName1")(
Seq(1, "a1", 10, "2021-03-21", 1001)
)
// Insert data; pre-combine field value type is short.
spark.sql(
s"""
| merge into $tableName1 as t0
| using (
| select 1 as id, 'a1' as name, 12 as price, 1002S as v, '2021-03-21'
as dt
| ) as s0
| on t0.id = s0.id
| when matched then update set
| id = s0.id, name = s0.name, price = s0.price, v = s0.v, dt = s0.dt
| when not matched then insert *
""".stripMargin
)
}
})
}{code}
If you run this it will fail. This is because when we convert from spark to
avro, short types just become integers. Then, in the expression payload we
convert back from avro to spark to evaluate the conditions and assignments.
However, we only have the avro schema so we use :
{code:java}
private def getAvroDeserializerFor(schema: Schema) = {
avroDeserializerCache.get()
.get(schema, new Function[Schema, HoodieAvroDeserializer] {
override def apply(t: Schema): HoodieAvroDeserializer =
sparkAdapter.createAvroDeserializer(schema,
convertAvroSchemaToStructType(schema))
})
} {code}
We are getting the spark schema from the avro schema, so the field will now be
an int in the spark schema. Then, when we try to evaluate the assignment it
will fail.
This failure occurs because when it was doing analysis resolution earlier,
spark sees that we are trying to assign a short to a long. It adds
Cast(short->long) in the assignment expression. When we try to evaluate that
assignment, the field type of the data is int. It tries to do the cast, but
since the cast is from short to long, the int is casted as a short. This fails
because it is an illegal operation
was:
Here is a test
{code:java}
test("Test MergeInto For PreCombineField With Different Types") {
spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
Seq("mor").foreach { tableType =>
val tableName1 = generateTableName
spark.sql(
s"""
| create table $tableName1 (
| id int,
| name string,
| price double,
| v long,
| dt string
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'v',
| hoodie.compaction.payload.class =
'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
| )
| partitioned by(dt)
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)
// Insert data; pre-combine field value type is long.
spark.sql(
s"""
| merge into $tableName1 as t0
| using (
| select 1 as id, 'a1' as name, 10 as price, 1001L as v,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
""".stripMargin
)
checkAnswer(s"select id,name,price,dt,v from $tableName1")(
Seq(1, "a1", 10, "2021-03-21", 1001)
)
// Insert data; pre-combine field value type is short.
spark.sql(
s"""
| merge into $tableName1 as t0
| using (
| select 1 as id, 'a1' as name, 12 as price, 1002S as v, '2021-03-21'
as dt
| ) as s0
| on t0.id = s0.id
| when matched then update set
| id = s0.id, name = s0.name, price = s0.price, v = s0.v, dt = s0.dt
| when not matched then insert *
""".stripMargin
)
}
})
}{code}
If you run this it will fail. This is because when we convert from spark to
avro, short types just become integers. Then, in the expression payload we
convert back from avro to spark to evaluate the conditions and assignments.
However, we only have the avro schema so we use :
{code:java}
private def getAvroDeserializerFor(schema: Schema) = {
avroDeserializerCache.get()
.get(schema, new Function[Schema, HoodieAvroDeserializer] {
override def apply(t: Schema): HoodieAvroDeserializer =
sparkAdapter.createAvroDeserializer(schema,
convertAvroSchemaToStructType(schema))
})
} {code}
We are getting the spark schema from the avro schema, so the field will now be
an int in the spark schema. Then, when we try to evaluate the assignment it
will fail.
This failure occurs because during planning, spark sees that we are trying to
assign a short to a long. It adds Cast(short->long) in the assignment
expression. When we try to evaluate that assignment, the field type of the data
is int. It tries to do the cast, but since the cast is from short to long, the
int is casted as a short. This fails because it is an illegal operation
> MIT doesn't handle some spark types correctly
> ---------------------------------------------
>
> Key: HUDI-8257
> URL: https://issues.apache.org/jira/browse/HUDI-8257
> Project: Apache Hudi
> Issue Type: Bug
> Components: spark, spark-sql
> Reporter: Jonathan Vexler
> Priority: Major
>
> Here is a test
> {code:java}
> test("Test MergeInto For PreCombineField With Different Types") {
> spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
> withRecordType()(withTempDir { tmp =>
> spark.sql("set hoodie.payload.combined.schema.validate = true")
> Seq("mor").foreach { tableType =>
> val tableName1 = generateTableName
> spark.sql(
> s"""
> | create table $tableName1 (
> | id int,
> | name string,
> | price double,
> | v long,
> | dt string
> | ) using hudi
> | tblproperties (
> | type = '$tableType',
> | primaryKey = 'id',
> | preCombineField = 'v',
> | hoodie.compaction.payload.class =
> 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
> | )
> | partitioned by(dt)
> | location '${tmp.getCanonicalPath}/$tableName1'
> """.stripMargin)
> // Insert data; pre-combine field value type is long.
> spark.sql(
> s"""
> | merge into $tableName1 as t0
> | using (
> | select 1 as id, 'a1' as name, 10 as price, 1001L as v,
> '2021-03-21' as dt
> | ) as s0
> | on t0.id = s0.id
> | when not matched and s0.id % 2 = 1 then insert *
> """.stripMargin
> )
> checkAnswer(s"select id,name,price,dt,v from $tableName1")(
> Seq(1, "a1", 10, "2021-03-21", 1001)
> )
> // Insert data; pre-combine field value type is short.
> spark.sql(
> s"""
> | merge into $tableName1 as t0
> | using (
> | select 1 as id, 'a1' as name, 12 as price, 1002S as v,
> '2021-03-21' as dt
> | ) as s0
> | on t0.id = s0.id
> | when matched then update set
> | id = s0.id, name = s0.name, price = s0.price, v = s0.v, dt = s0.dt
> | when not matched then insert *
> """.stripMargin
> )
> }
> })
> }{code}
> If you run this it will fail. This is because when we convert from spark to
> avro, short types just become integers. Then, in the expression payload we
> convert back from avro to spark to evaluate the conditions and assignments.
> However, we only have the avro schema so we use :
> {code:java}
> private def getAvroDeserializerFor(schema: Schema) = {
> avroDeserializerCache.get()
> .get(schema, new Function[Schema, HoodieAvroDeserializer] {
> override def apply(t: Schema): HoodieAvroDeserializer =
> sparkAdapter.createAvroDeserializer(schema,
> convertAvroSchemaToStructType(schema))
> })
> } {code}
> We are getting the spark schema from the avro schema, so the field will now
> be an int in the spark schema. Then, when we try to evaluate the assignment
> it will fail.
> This failure occurs because when it was doing analysis resolution earlier,
> spark sees that we are trying to assign a short to a long. It adds
> Cast(short->long) in the assignment expression. When we try to evaluate that
> assignment, the field type of the data is int. It tries to do the cast, but
> since the cast is from short to long, the int is casted as a short. This
> fails because it is an illegal operation
--
This message was sent by Atlassian Jira
(v8.20.10#820010)