Davis-Zhang-Onehouse opened a new pull request, #12215:
URL: https://github.com/apache/hudi/pull/12215

   
   
   ### Change Logs
   
   
   ### Merge into updated & insert actions will fail the query once detect 
column type mismatch for primary key/partition key/precombined key
   
   Allowing implicit casting of partition key can leads to partition path 
corruption. An example is if we do MIT where source table has a double value 
column and target table partitions on the same column with int type, the 
partition path will contain double value. Later select queries over the table 
will complain it cannot convert double to int.
   
   Allowing implicit casting of primary key can leads to data correctness 
issue. Here is an example:
   
           // Create target table with double primary key
           spark.sql(
             s"""
                |create table $targetTable (
                |  id double,
                |  name string,
                |  value_double double,
                |  ts long
                |) using hudi
                |location '${tmp.getCanonicalPath}/$targetTable'
                |tblproperties (
                |  type = '$tableType',
                |  primaryKey = 'id',
                |  preCombineField = 'ts'
                |)
            """.stripMargin)
   
           // Create source table with int primary key
           spark.sql(
             s"""
                |create table $sourceTable (
                |  id int,
                |  name string,
                |  value_double double,
                |  ts long
                |) using hudi
                |location '${tmp.getCanonicalPath}/$sourceTable'
                |tblproperties (
                |  type = '$tableType',
                |  primaryKey = 'id',
                |  preCombineField = 'ts'
                |)
            """.stripMargin)
   
           // Insert initial data into target table
           spark.sql(
             s"""
                |insert into $targetTable
                |select
                |  cast(1 as double) as id,
                |  'initial1' as name,
                |  1.1 as value_double,
                |  1000 as ts
            """.stripMargin)
   
           // Insert data into source table with int ids
           spark.sql(
             s"""
                |insert into $sourceTable
                |select
                |  1 as id,
                |  'updated1' as name,
                |  1.11 as value_double,
                |  1001 as ts
            """.stripMargin)
   
           // Perform merge operation
           spark.sql(
             s"""
                |merge into $targetTable t
                |using $sourceTable s
                |on t.id = s.id
                |when matched then update set *
                |when not matched then insert *
            """.stripMargin)
   
   In the end we expect target table with 1 record
   (1, updated1, 1.1, 1001)
   
   but actually it is
   (1, initial, 1.1, 1000)
   (1, updated1, 1.1, 1001)
   
   For precombined field we enforce the same data type check to avoid any 
potential data correctness issue.
   
   If target table id column is int, we got the expected behavior.
   
   ### Merge into delete action continue to follow today's behavior
   
   Delete action does not require strict data type matching, as long as the 
column types are cast-able from source column to target column, it is allowed.
   
   - Unlike insert/update which contains assignment from source to target, 
delete operation does not assign values but just comparing them in the ON 
clause, we just need to ensure the comparison part accounts for type 
mismatches. Especially, the precombined key column is out of scope as it only 
takes effect when assignment happens. Similarily, if partition key/primary key 
are not involved in ON clause, we don't need to do any check.
   - For condition clause, the recordKeyAttributeToConditionExpression variable 
already takes care of data type handling for both columns. Today's behavior is 
it will do best effort type casting to match source column data type to target, 
if cast succeeds everything works as expected, otherwise incompatible data type 
error is thrown.
   
   ### For all other data column types, implicit type casting is allowed and 
validated.
   
   Since MIT only requires column type matches for the 3 types of columns, for 
others spark-hudi did implicit type casting. Tests are written to capture the 
exhaustive behavior of such handling.
   
   handle delete MIT action + exhaustive coverage
   
   ### Impact
   
   Merge into insert & update actions now enforce strict column data type 
matching for primary key, precombined key and partition key. Queries previously 
work might error out.
   
   ### Risk level (write none, low medium or high below)
   
   Low
   
   ### Documentation Update
   
   We should update the merge into doc about this new restriction.
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to