Jonathan Vexler created HUDI-8320:
-------------------------------------
Summary: Schema on read schema evolution does not support dropping
a column and then renaming another one to the original columns name
Key: HUDI-8320
URL: https://issues.apache.org/jira/browse/HUDI-8320
Project: Apache Hudi
Issue Type: Bug
Components: reader-core, writer-core
Reporter: Jonathan Vexler
Try out this test by adding it to
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala:
{code:java}
private val twoFieldSchema = StructType(Seq(
StructField("col1", StringType),
StructField("col2", StringType)
)) private val oneFieldSchema = StructType(Seq(
StructField("col1", StringType)
))
test("Test simple col rename overwrite") {
val rows = Seq(Row("aaa", "bbb"))
val data = sparkSession.sparkContext.parallelize(rows)
val oldRow = sparkSession.createDataFrame(data,
twoFieldSchema).queryExecution.toRdd.first() val rowWriter1 =
HoodieInternalRowUtils.genUnsafeRowWriter(twoFieldSchema, oneFieldSchema,
JCollections.singletonMap("col1", "col2"))
val newRow1 = rowWriter1(oldRow) //This does not work, see "Test alter
column multiple times"
//val serDe1 = sparkAdapter.createSparkRowSerDe(oneFieldSchema)
//assertEquals(serDe1.deserializeRow(newRow1), Row("bbb"))
} {code}
The assert equals that is commented out will fail.
This is not the expected behavior. We have a test in TestSpark3DDL "Test alter
column multiple times":
{code:java}
test("Test alter column multiple times") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_3) {
spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql(
s"""
|create table $tableName (
| id int,
| col1 string,
| col2 string,
| ts long
|) using hudi
| location '$tablePath'
| options (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"show create table ${tableName}").show(false)
spark.sql(s"insert into ${tableName} values (1, 'aaa', 'bbb', 1000)")
// Rename to a previously existing column name + insert
spark.sql(s"alter table ${tableName} drop column col1")
spark.sql(s"alter table ${tableName} rename column col2 to col1")
spark.sql(s"insert into ${tableName} values (2, 'aaa', 1000)")
checkAnswer(spark.sql(s"select col1 from ${tableName} order by
id").collect())(
Seq("bbb"), Seq("aaa")
)
}
}
}
} {code}
If you use spark merger for this test, it will fail
--
This message was sent by Atlassian Jira
(v8.20.10#820010)