Davis-Zhang-Onehouse opened a new issue, #12180:
URL: https://github.com/apache/hudi/issues/12180

   
   **Describe the problem you faced**
   Query suite toy example
   Hudi 0.14
   spark 3.4
   
   when we do merge into with extra condition I got error MIT failed
   ```
   MERGE INTO hudi_table_mor_2_partition_columns t
   USING comprehensive_merge_source s
   ON t.id = s.id
   WHEN MATCHED 
     AND s.operation = 'UPDATE_DEPT_MATCH' 
     AND t.department = s.department 
   THEN
     UPDATE SET *;
   ```
   Full stack trace
   ```
   Error: org.apache.hive.service.cli.HiveSQLException: Error running query: 
org.apache.hudi.exception.HoodieException: Merge into Hoodie table command 
failed
        at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:46)
        at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:261)
        at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:165)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
        at 
org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
        at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:40)
        at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:165)
        at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:160)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/javax.security.auth.Subject.doAs(Unknown Source)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
        at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:174)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)
   Caused by: org.apache.hudi.exception.HoodieException: Merge into Hoodie 
table command failed
        at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:441)
        at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:218)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:226)
        ... 16 more (state=,code=0)
   ```
   
   Full query suite repro the issue
   
   
   ```
    CREATE TEMPORARY VIEW comprehensive_merge_source AS
     SELECT * FROM (
       -- Update all columns based on department match
       SELECT 
         1 as id,
         'John Doe' as name,
         34.99 as price,
         1599490800 as ts,
         '[email protected]' as email,
         'Engineering' as department,
         80000 as salary,
         to_date('2023-01-15') as hire_date,
         true as is_manager,
         'UPDATE_DEPT_MATCH' as operation
       UNION ALL
       -- Update based on salary condition
       SELECT
         3 as id,
         'Bob Smith' as name,
         44.99 as price,
         1599577200 as ts,
         '[email protected]' as email,
         'Engineering' as department,
         90000 as salary,
         to_date('2023-02-01') as hire_date,
         true as is_manager,
         'UPDATE_SALARY' as operation
       UNION ALL
       -- Delete record
       SELECT
         4 as id,
         'Alice Johnson' as name,
         0.0 as price,
         1599490800 as ts,
         '[email protected]' as email,
         'Terminated' as department,
         0 as salary,
         CAST(NULL AS DATE) as hire_date,
         false as is_manager,
         'DELETE' as operation
       UNION ALL
       -- Insert new record (Engineering)
       SELECT
         10 as id,
         'Peter Parker' as name,
         29.99 as price,
         1599577200 as ts,
         '[email protected]' as email,
         'Engineering' as department,
         60000 as salary,
         to_date('2022-09-01') as hire_date,
         false as is_manager,
         'INSERT' as operation
       UNION ALL
       -- Insert new record (Marketing)
       SELECT
         7 as id,
         'Emma Watson' as name,
         44.99 as price,
         1599577200 as ts,
         '[email protected]' as email,
         'Marketing' as department,
         85000 as salary,
         to_date('2022-08-15') as hire_date,
         true as is_manager,
         'INSERT' as operation
     )
   ;
    CREATE TABLE hudi_table_mor_2_partition_columns (
     id INT,
     name STRING,
     price DOUBLE,
     ts BIGINT
   ) USING hudi
   LOCATION 
's3a://bucket-fb543790/lakes/observed-default/sql/hudi_table_mor_2_partition_columns/'
   PARTITIONED BY (id,name)
   TBLPROPERTIES (
     type = 'mor',
     primaryKey = 'id',
     
     preCombineField = 'ts'
   )
   ;
    INSERT INTO hudi_table_mor_2_partition_columns
   PARTITION (id,name)
   SELECT id, name, price, ts
   FROM (
     SELECT 1 as id, 'John Doe' as name, 19.99 as price, 1598886000 as ts
     UNION ALL
     SELECT 2, 'Jane Doe', 24.99, 1598972400
     UNION ALL
     SELECT 3, 'Bob Smith', 14.99, 1599058800
   )
   ;
   
   INSERT INTO
   hudi_table_mor_2_partition_columns
   PARTITION (id,name)
   SELECT id, name, price, ts
   FROM (
     SELECT 4 as id, 'Alice Johnson' as name, 34.99 as price, 1599145200 as ts
   )
   ;
    SELECT id, name, price, ts 
   FROM hudi_table_mor_2_partition_columns 
   ORDER BY id
   ;
    ALTER TABLE
   hudi_table_mor_2_partition_columns
   ADD COLUMN email STRING
   ;
   
    INSERT INTO hudi_table_mor_2_partition_columns
   PARTITION (id,name)
   SELECT id, name, price, ts, email
   FROM (
     SELECT 6 as id,
            'Tom Hanks' as name,
             44.99 as price,
             1599318000 as ts,
             '[email protected]' as email
   )
   ;
   
    ALTER TABLE hudi_table_mor_2_partition_columns
   ADD COLUMNS (
     department STRING,
     salary INT,
     hire_date DATE,
     is_manager BOOLEAN
   )
   ;
    UPDATE hudi_table_mor_2_partition_columns
   SET department = 'HR', salary = 60000, hire_date = to_date('2022-01-15'), 
is_manager = true
   WHERE id = 1
   ;
    INSERT INTO hudi_table_mor_2_partition_columns
   PARTITION (id,name)
   SELECT id, name, price, ts, email, department, salary, hire_date, is_manager
   FROM (
     SELECT 
       6 as id, 
       'Tom Hanks' as name, 
       44.99 as price, 
       1599318000 as ts,
       '[email protected]' as email, 
       'Sales' as department, 
       75000 as salary,
       to_date('2022-03-01') as hire_date, 
       true as is_manager
   )
   ;
    INSERT INTO hudi_table_mor_2_partition_columns
   (id, name, price, ts, email, department, hire_date)
   VALUES (
     7 as id,
     'Emma Watson' as name,
     39.99 as price,
     1599404400 as ts,
     '[email protected]' as email,
     'Marketing' as department,
     CAST('2022-03-02' AS DATE) as hire_date)
   ;
   
   MERGE INTO hudi_table_mor_2_partition_columns t
   USING comprehensive_merge_source s
   ON t.id = s.id
   WHEN MATCHED 
     AND s.operation = 'UPDATE_DEPT_MATCH' 
     AND t.department = s.department 
   THEN
     UPDATE SET *
   WHEN MATCHED 
     AND s.operation = 'UPDATE_SALARY'
     AND s.salary > t.salary 
   THEN
     UPDATE SET *
   WHEN MATCHED 
     AND s.operation = 'DELETE' 
   THEN
     DELETE
   WHEN NOT MATCHED 
     AND s.operation = 'INSERT'
     AND (
       s.department = 'Engineering' 
       OR s.salary >= 70000
     )
   THEN
     INSERT *
   ;
   
   ```
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.
   2.
   3.
   4.
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :0.14
   
   * Spark version :3.4.3
   
   * Hive version : 2.3.10
   
   * Hadoop version :3.3.4
   
   * Storage (HDFS/S3/GCS..) :S3
   
   * Running on Docker? (yes/no) : yes
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to