[ 
https://issues.apache.org/jira/browse/HUDI-9081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davis Zhang updated HUDI-9081:
------------------------------
    Fix Version/s: 1.1.0

> Type cast merge into hit issue
> ------------------------------
>
>                 Key: HUDI-9081
>                 URL: https://issues.apache.org/jira/browse/HUDI-9081
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Davis Zhang
>            Priority: Major
>             Fix For: 1.1.0
>
>
> For the same test if I first save value in a source table and then run MIT it 
> works fine.
> {code:java}
> test("Test All Valid Type Casting For Merge Into and Insert") {
>   // Keep the ColumnTypePair case class definition
>   case class ColumnTypePair(sourceType: String,
>                            targetType: String,
>                            testValue: String,
>                            expectedValue: Any,
>                            columnName: String)
>   // Keep the validTypePairs definition
>   val validTypePairs = Seq(
>     // Numeric widening pairs
>     ColumnTypePair("tinyint", "smallint", "127", 127, "tiny_to_small"),
>     ColumnTypePair("tinyint", "int", "127", 127, "tiny_to_int"),
>     ColumnTypePair("tinyint", "bigint", "127", 127L, "tiny_to_big"),
>     ColumnTypePair("tinyint", "float", "127", 127.0f, "tiny_to_float"),
>     ColumnTypePair("tinyint", "double", "127", 127.0d, "tiny_to_double"),
>     ColumnTypePair("tinyint", "decimal(10,1)", "127", 
> java.math.BigDecimal.valueOf(127.0), "tiny_to_decimal"),
>     ColumnTypePair("smallint", "int", "32767", 32767, "small_to_int"),
>     ColumnTypePair("smallint", "bigint", "32767", 32767L, "small_to_big"),
>     ColumnTypePair("smallint", "float", "32767", 32767.0f, "small_to_float"),
>     ColumnTypePair("smallint", "double", "32767", 32767.0d, 
> "small_to_double"),
>     ColumnTypePair("smallint", "decimal(10,1)", "32767", 
> java.math.BigDecimal.valueOf(32767.0), "small_to_decimal"),
>     ColumnTypePair("int", "bigint", "2147483647", 2147483647L, "int_to_big"),
>     ColumnTypePair("int", "float", "2147483647", 2147483647.0f, 
> "int_to_float"),
>     ColumnTypePair("int", "double", "2147483647", 2147483647.0d, 
> "int_to_double"),
>     ColumnTypePair("int", "decimal(10,1)", "22", 
> java.math.BigDecimal.valueOf(22.0), "int_to_decimal"),
>     ColumnTypePair("float", "double", "3.14", 3.140000104904175d, 
> "float_to_double"),
>     ColumnTypePair("float", "decimal(10,2)", "3.14", 
> java.math.BigDecimal.valueOf(3.14).setScale(2, 
> java.math.RoundingMode.HALF_UP), "float_to_decimal"),
>     // Timestamp/Date conversions
>     ColumnTypePair("timestamp", "string", "timestamp'2023-01-01 12:00:00'", 
> "2023-01-01 12:00:00", "ts_to_string"),
>     ColumnTypePair("timestamp", "date", "timestamp'2023-01-01 12:00:00'", 
> java.sql.Date.valueOf("2023-01-01"), "ts_to_date"),
>     ColumnTypePair("date", "string", "date'2023-01-01'", "2023-01-01", 
> "date_to_string"),
>     ColumnTypePair("date", "timestamp", "date'2023-01-01'", 
> java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "date_to_ts"),
>     // Boolean conversions
>     ColumnTypePair("boolean", "string", "true", "true", "bool_to_string")
>   )
>   Seq("cow", "mor").foreach { tableType =>
>     withTempDir { tmp =>
>       val targetTable = generateTableName
>       // Create column definitions for target table
>       val targetColumns = validTypePairs.map(p => s"${p.columnName} 
> ${p.targetType}").mkString(",\n  ")
>       // Create target table
>       spark.sql(
>         s"""
>            |create table $targetTable (
>            |  id int,
>            |  $targetColumns,
>            |  ts long
>            |) using hudi
>            |location '${tmp.getCanonicalPath}/$targetTable'
>            |tblproperties (
>            |  type = '$tableType',
>            |  primaryKey = 'id',
>            |  preCombineField = 'ts'
>            |)
>        """.stripMargin)
>       // Insert initial data into target table with null values
>       val targetInsertValues = validTypePairs.map(_ => "null").mkString(", ")
>       spark.sql(
>         s"""
>            |insert into $targetTable
>            |select 1 as id, $targetInsertValues, 1000 as ts
>        """.stripMargin)
>       // Generate source values with explicit casting
>       val sourceValues = validTypePairs.map(p => s"cast(${p.testValue} as 
> ${p.sourceType}) as ${p.columnName}").mkString(",\n")
>       // Perform merge operation using inline subquery
>       spark.sql(
>         s"""
>            |merge into $targetTable t
>            |using (
>            |  select 
>            |    1 as id,
>            |    $sourceValues,
>            |    cast(1001 as long) as ts
>            |) s
>            |on t.id = s.id
>            |when matched then update set *
>            |when not matched then insert *
>        """.stripMargin)
>       // Verify results
>       val c = validTypePairs.map(p => s"${p.columnName}").mkString(",\n  ")
>       val result = spark.sql(s"select $c from $targetTable where id = 
> 1").collect()(0)
>       validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
>         val actualValue = result.get(idx)
>         assert(actualValue == pair.expectedValue,
>           s"${tableType.toUpperCase}: Column ${pair.columnName} - Expected 
> ${pair.expectedValue} (${pair.expectedValue.getClass}) but got $actualValue 
> (${if (actualValue != null) actualValue.getClass else "null"})")
>       }
>       // Test insert case using inline subquery
>       val sourceValues2 = validTypePairs.map(p => s"cast(${p.testValue} as 
> ${p.sourceType})").mkString(", ")
>       spark.sql(
>         s"""
>            |merge into $targetTable t
>            |using (
>            |  select 
>            |    2 as id,
>            |    $sourceValues2,
>            |    1002 as ts
>            |) s
>            |on t.id = s.id
>            |when matched then update set *
>            |when not matched then insert *
>        """.stripMargin)
>       // Verify inserted row
>       val result2 = spark.sql(s"select * from $targetTable where id = 
> 2").collect()(0)
>       validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
>         val actualValue = result2.get(idx + 1)
>         assert(actualValue != pair.expectedValue,
>           s"${tableType.toUpperCase}: Insert - Column ${pair.columnName} - 
> Expected ${pair.expectedValue} (${pair.expectedValue.getClass}) but got 
> $actualValue (${if (actualValue != null) actualValue.getClass else "null"})")
>       }
>     }
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to