Davis Zhang created HUDI-9081:
---------------------------------

             Summary: Type cast merge into hit issue
                 Key: HUDI-9081
                 URL: https://issues.apache.org/jira/browse/HUDI-9081
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Davis Zhang


For the same test if I first save value in a source table and then run MIT it 
works fine.
{code:java}
test("Test All Valid Type Casting For Merge Into and Insert") {
  // Keep the ColumnTypePair case class definition
  case class ColumnTypePair(sourceType: String,
                           targetType: String,
                           testValue: String,
                           expectedValue: Any,
                           columnName: String)

  // Keep the validTypePairs definition
  val validTypePairs = Seq(
    // Numeric widening pairs
    ColumnTypePair("tinyint", "smallint", "127", 127, "tiny_to_small"),
    ColumnTypePair("tinyint", "int", "127", 127, "tiny_to_int"),
    ColumnTypePair("tinyint", "bigint", "127", 127L, "tiny_to_big"),
    ColumnTypePair("tinyint", "float", "127", 127.0f, "tiny_to_float"),
    ColumnTypePair("tinyint", "double", "127", 127.0d, "tiny_to_double"),
    ColumnTypePair("tinyint", "decimal(10,1)", "127", 
java.math.BigDecimal.valueOf(127.0), "tiny_to_decimal"),

    ColumnTypePair("smallint", "int", "32767", 32767, "small_to_int"),
    ColumnTypePair("smallint", "bigint", "32767", 32767L, "small_to_big"),
    ColumnTypePair("smallint", "float", "32767", 32767.0f, "small_to_float"),
    ColumnTypePair("smallint", "double", "32767", 32767.0d, "small_to_double"),
    ColumnTypePair("smallint", "decimal(10,1)", "32767", 
java.math.BigDecimal.valueOf(32767.0), "small_to_decimal"),

    ColumnTypePair("int", "bigint", "2147483647", 2147483647L, "int_to_big"),
    ColumnTypePair("int", "float", "2147483647", 2147483647.0f, "int_to_float"),
    ColumnTypePair("int", "double", "2147483647", 2147483647.0d, 
"int_to_double"),
    ColumnTypePair("int", "decimal(10,1)", "22", 
java.math.BigDecimal.valueOf(22.0), "int_to_decimal"),

    ColumnTypePair("float", "double", "3.14", 3.140000104904175d, 
"float_to_double"),
    ColumnTypePair("float", "decimal(10,2)", "3.14", 
java.math.BigDecimal.valueOf(3.14).setScale(2, java.math.RoundingMode.HALF_UP), 
"float_to_decimal"),

    // Timestamp/Date conversions
    ColumnTypePair("timestamp", "string", "timestamp'2023-01-01 12:00:00'", 
"2023-01-01 12:00:00", "ts_to_string"),
    ColumnTypePair("timestamp", "date", "timestamp'2023-01-01 12:00:00'", 
java.sql.Date.valueOf("2023-01-01"), "ts_to_date"),
    ColumnTypePair("date", "string", "date'2023-01-01'", "2023-01-01", 
"date_to_string"),
    ColumnTypePair("date", "timestamp", "date'2023-01-01'", 
java.sql.Timestamp.valueOf("2023-01-01 00:00:00"), "date_to_ts"),

    // Boolean conversions
    ColumnTypePair("boolean", "string", "true", "true", "bool_to_string")
  )

  Seq("cow", "mor").foreach { tableType =>
    withTempDir { tmp =>
      val targetTable = generateTableName

      // Create column definitions for target table
      val targetColumns = validTypePairs.map(p => s"${p.columnName} 
${p.targetType}").mkString(",\n  ")

      // Create target table
      spark.sql(
        s"""
           |create table $targetTable (
           |  id int,
           |  $targetColumns,
           |  ts long
           |) using hudi
           |location '${tmp.getCanonicalPath}/$targetTable'
           |tblproperties (
           |  type = '$tableType',
           |  primaryKey = 'id',
           |  preCombineField = 'ts'
           |)
       """.stripMargin)

      // Insert initial data into target table with null values
      val targetInsertValues = validTypePairs.map(_ => "null").mkString(", ")
      spark.sql(
        s"""
           |insert into $targetTable
           |select 1 as id, $targetInsertValues, 1000 as ts
       """.stripMargin)

      // Generate source values with explicit casting
      val sourceValues = validTypePairs.map(p => s"cast(${p.testValue} as 
${p.sourceType}) as ${p.columnName}").mkString(",\n")

      // Perform merge operation using inline subquery
      spark.sql(
        s"""
           |merge into $targetTable t
           |using (
           |  select 
           |    1 as id,
           |    $sourceValues,
           |    cast(1001 as long) as ts
           |) s
           |on t.id = s.id
           |when matched then update set *
           |when not matched then insert *
       """.stripMargin)

      // Verify results
      val c = validTypePairs.map(p => s"${p.columnName}").mkString(",\n  ")
      val result = spark.sql(s"select $c from $targetTable where id = 
1").collect()(0)
      validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
        val actualValue = result.get(idx)
        assert(actualValue == pair.expectedValue,
          s"${tableType.toUpperCase}: Column ${pair.columnName} - Expected 
${pair.expectedValue} (${pair.expectedValue.getClass}) but got $actualValue 
(${if (actualValue != null) actualValue.getClass else "null"})")
      }

      // Test insert case using inline subquery
      val sourceValues2 = validTypePairs.map(p => s"cast(${p.testValue} as 
${p.sourceType})").mkString(", ")
      spark.sql(
        s"""
           |merge into $targetTable t
           |using (
           |  select 
           |    2 as id,
           |    $sourceValues2,
           |    1002 as ts
           |) s
           |on t.id = s.id
           |when matched then update set *
           |when not matched then insert *
       """.stripMargin)

      // Verify inserted row
      val result2 = spark.sql(s"select * from $targetTable where id = 
2").collect()(0)
      validTypePairs.zipWithIndex.foreach { case (pair, idx) =>
        val actualValue = result2.get(idx + 1)
        assert(actualValue != pair.expectedValue,
          s"${tableType.toUpperCase}: Insert - Column ${pair.columnName} - 
Expected ${pair.expectedValue} (${pair.expectedValue.getClass}) but got 
$actualValue (${if (actualValue != null) actualValue.getClass else "null"})")
      }
    }
  }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to