[
https://issues.apache.org/jira/browse/HUDI-9081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Davis Zhang updated HUDI-9081:
------------------------------
Fix Version/s: 1.1.0
> Type cast merge into hit issue
> ------------------------------
>
> Key: HUDI-9081
> URL: https://issues.apache.org/jira/browse/HUDI-9081
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Davis Zhang
> Priority: Major
> Fix For: 1.1.0
>
>
> For the same test if I first save value in a source table and then run MIT it
> works fine.
> {code:java}
> test("Test All Valid Type Casting For Merge Into and Insert") {
> // Keep the ColumnTypePair case class definition
> case class ColumnTypePair(sourceType: String,
> targetType: String,
> testValue: String,
> expectedValue: Any,
> columnName: String)
> // Keep the validTypePairs definition
> val validTypePairs = Seq(
> // Numeric widening pairs
> ColumnTypePair("tinyint", "smallint", "127", 127, "tiny_to_small"),
> ColumnTypePair("tinyint", "int", "127", 127, "tiny_to_int"),
> ColumnTypePair("tinyint", "bigint", "127", 127L, "tiny_to_big"),
> ColumnTypePair("tinyint", "float", "127", 127.0f, "tiny_to_float"),
> ColumnTypePair("tinyint", "double", "127", 127.0d, "tiny_to_double"),
> ColumnTypePair("tinyint", "decimal(10,1)", "127",
> java.math.BigDecimal.valueOf(127.0), "tiny_to_decimal"),
> ColumnTypePair("smallint", "int", "32767", 32767, "small_to_int"),
> ColumnTypePair("smallint", "bigint", "32767", 32767L, "small_to_big"),
> ColumnTypePair("smallint", "float", "32767", 32767.0f, "small_to_float"),
> ColumnTypePair("smallint", "double", "32767", 32767.0d,
> "small_to_double"),
> ColumnTypePair("smallint", "decimal(10,1)", "32767",
> java.math.BigDecimal.valueOf(32767.0), "small_to_decimal"),
> ColumnTypePair("int", "bigint", "2147483647", 2147483647L, "int_to_big"),
> ColumnTypePair("int", "float", "2147483647", 2147483647.0f,
> "int_to_float"),
> ColumnTypePair("int", "double", "2147483647", 2147483647.0d,
> "int_to_double"),
> ColumnTypePair("int", "decimal(10,1)", "22",
> java.math.BigDecimal.valueOf(22.0), "int_to_decimal"),
> ColumnTypePair("float", "double", "3.14", 3.140000104904175d,
> "float_to_double"),
> ColumnTypePair("float", "decimal(10,2)", "3.14",
> java.math.BigDecimal.valueOf(3.14).setScale(2,
> java.math.RoundingMode.HALF_UP), "float_to_decimal"),
> // Timestamp/Date conversions
> ColumnTypePair("timestamp", "string", "timestamp'2023-01-01 12:00:00'",
> "2023-01-01 12:00:00", "ts_to_string"),
> ColumnTypePair("timestamp", "date", "timestamp'2023-01-01 12:00:00'",
> java.sql.Date.valueOf("2023-01-01"), "ts_to_date"),
> ColumnTypePair("date", "string", "date'2023-01-01'", "2023-01-01",
> "date_to_string"),
> ColumnTypePair("date", "timestamp", "date'2023-01-01'",
> java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "date_to_ts"),
> // Boolean conversions
> ColumnTypePair("boolean", "string", "true", "true", "bool_to_string")
> )
> Seq("cow", "mor").foreach { tableType =>
> withTempDir { tmp =>
> val targetTable = generateTableName
> // Create column definitions for target table
> val targetColumns = validTypePairs.map(p => s"${p.columnName}
> ${p.targetType}").mkString(",\n ")
> // Create target table
> spark.sql(
> s"""
> |create table $targetTable (
> | id int,
> | $targetColumns,
> | ts long
> |) using hudi
> |location '${tmp.getCanonicalPath}/$targetTable'
> |tblproperties (
> | type = '$tableType',
> | primaryKey = 'id',
> | preCombineField = 'ts'
> |)
> """.stripMargin)
> // Insert initial data into target table with null values
> val targetInsertValues = validTypePairs.map(_ => "null").mkString(", ")
> spark.sql(
> s"""
> |insert into $targetTable
> |select 1 as id, $targetInsertValues, 1000 as ts
> """.stripMargin)
> // Generate source values with explicit casting
> val sourceValues = validTypePairs.map(p => s"cast(${p.testValue} as
> ${p.sourceType}) as ${p.columnName}").mkString(",\n")
> // Perform merge operation using inline subquery
> spark.sql(
> s"""
> |merge into $targetTable t
> |using (
> | select
> | 1 as id,
> | $sourceValues,
> | cast(1001 as long) as ts
> |) s
> |on t.id = s.id
> |when matched then update set *
> |when not matched then insert *
> """.stripMargin)
> // Verify results
> val c = validTypePairs.map(p => s"${p.columnName}").mkString(",\n ")
> val result = spark.sql(s"select $c from $targetTable where id =
> 1").collect()(0)
> validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
> val actualValue = result.get(idx)
> assert(actualValue == pair.expectedValue,
> s"${tableType.toUpperCase}: Column ${pair.columnName} - Expected
> ${pair.expectedValue} (${pair.expectedValue.getClass}) but got $actualValue
> (${if (actualValue != null) actualValue.getClass else "null"})")
> }
> // Test insert case using inline subquery
> val sourceValues2 = validTypePairs.map(p => s"cast(${p.testValue} as
> ${p.sourceType})").mkString(", ")
> spark.sql(
> s"""
> |merge into $targetTable t
> |using (
> | select
> | 2 as id,
> | $sourceValues2,
> | 1002 as ts
> |) s
> |on t.id = s.id
> |when matched then update set *
> |when not matched then insert *
> """.stripMargin)
> // Verify inserted row
> val result2 = spark.sql(s"select * from $targetTable where id =
> 2").collect()(0)
> validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
> val actualValue = result2.get(idx + 1)
> assert(actualValue != pair.expectedValue,
> s"${tableType.toUpperCase}: Insert - Column ${pair.columnName} -
> Expected ${pair.expectedValue} (${pair.expectedValue.getClass}) but got
> $actualValue (${if (actualValue != null) actualValue.getClass else "null"})")
> }
> }
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)