Davis Zhang created HUDI-9081:
---------------------------------
Summary: Type cast merge into hit issue
Key: HUDI-9081
URL: https://issues.apache.org/jira/browse/HUDI-9081
Project: Apache Hudi
Issue Type: Bug
Reporter: Davis Zhang
For the same test if I first save value in a source table and then run MIT it
works fine.
{code:java}
test("Test All Valid Type Casting For Merge Into and Insert") {
// Keep the ColumnTypePair case class definition
case class ColumnTypePair(sourceType: String,
targetType: String,
testValue: String,
expectedValue: Any,
columnName: String)
// Keep the validTypePairs definition
val validTypePairs = Seq(
// Numeric widening pairs
ColumnTypePair("tinyint", "smallint", "127", 127, "tiny_to_small"),
ColumnTypePair("tinyint", "int", "127", 127, "tiny_to_int"),
ColumnTypePair("tinyint", "bigint", "127", 127L, "tiny_to_big"),
ColumnTypePair("tinyint", "float", "127", 127.0f, "tiny_to_float"),
ColumnTypePair("tinyint", "double", "127", 127.0d, "tiny_to_double"),
ColumnTypePair("tinyint", "decimal(10,1)", "127",
java.math.BigDecimal.valueOf(127.0), "tiny_to_decimal"),
ColumnTypePair("smallint", "int", "32767", 32767, "small_to_int"),
ColumnTypePair("smallint", "bigint", "32767", 32767L, "small_to_big"),
ColumnTypePair("smallint", "float", "32767", 32767.0f, "small_to_float"),
ColumnTypePair("smallint", "double", "32767", 32767.0d, "small_to_double"),
ColumnTypePair("smallint", "decimal(10,1)", "32767",
java.math.BigDecimal.valueOf(32767.0), "small_to_decimal"),
ColumnTypePair("int", "bigint", "2147483647", 2147483647L, "int_to_big"),
ColumnTypePair("int", "float", "2147483647", 2147483647.0f, "int_to_float"),
ColumnTypePair("int", "double", "2147483647", 2147483647.0d,
"int_to_double"),
ColumnTypePair("int", "decimal(10,1)", "22",
java.math.BigDecimal.valueOf(22.0), "int_to_decimal"),
ColumnTypePair("float", "double", "3.14", 3.140000104904175d,
"float_to_double"),
ColumnTypePair("float", "decimal(10,2)", "3.14",
java.math.BigDecimal.valueOf(3.14).setScale(2, java.math.RoundingMode.HALF_UP),
"float_to_decimal"),
// Timestamp/Date conversions
ColumnTypePair("timestamp", "string", "timestamp'2023-01-01 12:00:00'",
"2023-01-01 12:00:00", "ts_to_string"),
ColumnTypePair("timestamp", "date", "timestamp'2023-01-01 12:00:00'",
java.sql.Date.valueOf("2023-01-01"), "ts_to_date"),
ColumnTypePair("date", "string", "date'2023-01-01'", "2023-01-01",
"date_to_string"),
ColumnTypePair("date", "timestamp", "date'2023-01-01'",
java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "date_to_ts"),
// Boolean conversions
ColumnTypePair("boolean", "string", "true", "true", "bool_to_string")
)
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
val targetTable = generateTableName
// Create column definitions for target table
val targetColumns = validTypePairs.map(p => s"${p.columnName}
${p.targetType}").mkString(",\n ")
// Create target table
spark.sql(
s"""
|create table $targetTable (
| id int,
| $targetColumns,
| ts long
|) using hudi
|location '${tmp.getCanonicalPath}/$targetTable'
|tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
|)
""".stripMargin)
// Insert initial data into target table with null values
val targetInsertValues = validTypePairs.map(_ => "null").mkString(", ")
spark.sql(
s"""
|insert into $targetTable
|select 1 as id, $targetInsertValues, 1000 as ts
""".stripMargin)
// Generate source values with explicit casting
val sourceValues = validTypePairs.map(p => s"cast(${p.testValue} as
${p.sourceType}) as ${p.columnName}").mkString(",\n")
// Perform merge operation using inline subquery
spark.sql(
s"""
|merge into $targetTable t
|using (
| select
| 1 as id,
| $sourceValues,
| cast(1001 as long) as ts
|) s
|on t.id = s.id
|when matched then update set *
|when not matched then insert *
""".stripMargin)
// Verify results
val c = validTypePairs.map(p => s"${p.columnName}").mkString(",\n ")
val result = spark.sql(s"select $c from $targetTable where id =
1").collect()(0)
validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
val actualValue = result.get(idx)
assert(actualValue == pair.expectedValue,
s"${tableType.toUpperCase}: Column ${pair.columnName} - Expected
${pair.expectedValue} (${pair.expectedValue.getClass}) but got $actualValue
(${if (actualValue != null) actualValue.getClass else "null"})")
}
// Test insert case using inline subquery
val sourceValues2 = validTypePairs.map(p => s"cast(${p.testValue} as
${p.sourceType})").mkString(", ")
spark.sql(
s"""
|merge into $targetTable t
|using (
| select
| 2 as id,
| $sourceValues2,
| 1002 as ts
|) s
|on t.id = s.id
|when matched then update set *
|when not matched then insert *
""".stripMargin)
// Verify inserted row
val result2 = spark.sql(s"select * from $targetTable where id =
2").collect()(0)
validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
val actualValue = result2.get(idx + 1)
assert(actualValue != pair.expectedValue,
s"${tableType.toUpperCase}: Insert - Column ${pair.columnName} -
Expected ${pair.expectedValue} (${pair.expectedValue.getClass}) but got
$actualValue (${if (actualValue != null) actualValue.getClass else "null"})")
}
}
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)