[ 
https://issues.apache.org/jira/browse/HUDI-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-4861:
---------------------------------
    Labels: pull-request-available  (was: )

> Relax MERGE INTO restrictions to permit casting of the matching condition
> -------------------------------------------------------------------------
>
>                 Key: HUDI-4861
>                 URL: https://issues.apache.org/jira/browse/HUDI-4861
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: spark-sql
>    Affects Versions: 0.12.0
>            Reporter: Alexey Kudinkin
>            Assignee: Alexey Kudinkin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.12.1
>
>
> Reported by user:
> [https://github.com/apache/hudi/issues/6626]
>  
> Following code
>  
> {code:java}
>      target_df = spark.read.format("hudi").load(basePath)
>     print("###################################")
>     print(target_df.printSchema())
>     # # target_df.show()
>     target_datatype_map = {}
>     for name, dtype in target_df.dtypes:
>         target_datatype_map[name] = dtype
>     print(str(target_datatype_map))
>     print("###################################")
>     for col in columns:
>         if has_column(deflateDf, col):
>             deflateDf = deflateDf.withColumn(col, F.col(col))
>         else:
>             deflateDf = deflateDf.withColumn(col, F.lit(None))
>     deflateDf.createOrReplaceTempView("deflate_table")
>     create_sql = "create table RESULTDATA using hudi location 
> '/tmp/RESULTDATA_mor'"
>     spark.sql(create_sql)
>     
>     merge_sql = """
>     merge into RESULTDATA as target
>         using (
>             select * from deflate_table as deflate
>         )
>         on target._context_id_ = deflate._context_id_ and target.id = 
> deflate.id
>         when matched
>         then update set
>         target.CREATED = cast(if(array_contains(deflate.changed_cols, 
> 'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY = 
> cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY, 
> target.CREATEDBY) as string),target.DELETED = 
> cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED, 
> target.DELETED) as timestamp),target.DELETEDBY = 
> cast(if(array_contains(deflate.changed_cols, 'DELETEDBY'), deflate.DELETEDBY, 
> target.DELETEDBY) as string),target.EXPIRATIONDATE = 
> cast(if(array_contains(deflate.changed_cols, 'EXPIRATIONDATE'), 
> deflate.EXPIRATIONDATE, target.EXPIRATIONDATE) as timestamp),target.ID = 
> cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as 
> decimal(12,0)),target.KEY = cast(if(array_contains(deflate.changed_cols, 
> 'KEY'), deflate.KEY, target.KEY) as string),target.LASTMODIFIED = 
> cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'), 
> deflate.LASTMODIFIED, target.LASTMODIFIED) as 
> timestamp),target.LASTMODIFIEDBY = 
> cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'), 
> deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.ORDERING = 
> cast(if(array_contains(deflate.changed_cols, 'ORDERING'), deflate.ORDERING, 
> target.ORDERING) as decimal(12,0)),target.RESULTID = 
> cast(if(array_contains(deflate.changed_cols, 'RESULTID'), deflate.RESULTID, 
> target.RESULTID) as decimal(12,0)),target.REPORTINGPERIODTYPE = 
> cast(if(array_contains(deflate.changed_cols, 'REPORTINGPERIODTYPE'), 
> deflate.REPORTINGPERIODTYPE, target.REPORTINGPERIODTYPE) as 
> string),target.RESULTDATE = cast(if(array_contains(deflate.changed_cols, 
> 'RESULTDATE'), deflate.RESULTDATE, target.RESULTDATE) as 
> timestamp),target.SATISFYINGNUMERATOR = 
> cast(if(array_contains(deflate.changed_cols, 'SATISFYINGNUMERATOR'), 
> deflate.SATISFYINGNUMERATOR, target.SATISFYINGNUMERATOR) as 
> decimal(12,0)),target.VALUE = cast(if(array_contains(deflate.changed_cols, 
> 'VALUE'), deflate.VALUE, target.VALUE) as string),target._ETL_RUN_ID_ = 
> cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'), 
> deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as 
> decimal(38,0)),target._ETL_MODIFIED_ = 
> cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'), 
> deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as 
> timestamp),target._EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, 
> '_EXTRACTED_'), deflate._EXTRACTED_, target._EXTRACTED_) as 
> timestamp),target._SOURCE_EXTRACTED_ = 
> cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'), 
> deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as 
> timestamp),target._LAST_MODIFIED_SEQ_ = 
> cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'), 
> deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as 
> decimal(38,0)),target._SCHEMA_CLASS_ = 
> cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'), 
> deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ 
> = cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'), 
> deflate._CONTEXT_ID_, target._CONTEXT_ID_) as 
> decimal(12,0)),target._IS_DELETED_ = 
> cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'), 
> deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
>         when not matched
>         then insert
>         
> (CREATED,CREATEDBY,DELETED,DELETEDBY,EXPIRATIONDATE,ID,KEY,LASTMODIFIED,LASTMODIFIEDBY,ORDERING,RESULTID,REPORTINGPERIODTYPE,RESULTDATE,SATISFYINGNUMERATOR,VALUE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_)
>  values (cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as 
> string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as 
> string),cast(deflate.EXPIRATIONDATE as timestamp),cast(deflate.ID as 
> decimal(12,0)),cast(deflate.KEY as string),cast(deflate.LASTMODIFIED as 
> timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.ORDERING as 
> decimal(12,0)),cast(deflate.RESULTID as 
> decimal(12,0)),cast(deflate.REPORTINGPERIODTYPE as 
> string),cast(deflate.RESULTDATE as 
> timestamp),cast(deflate.SATISFYINGNUMERATOR as 
> decimal(12,0)),cast(deflate.VALUE as string),cast(deflate._ETL_RUN_ID_ as 
> decimal(38,0)),cast(deflate._ETL_MODIFIED_ as 
> timestamp),cast(deflate._EXTRACTED_ as 
> timestamp),cast(deflate._SOURCE_EXTRACTED_ as 
> timestamp),cast(deflate._LAST_MODIFIED_SEQ_ as 
> decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as 
> string),cast(deflate._CONTEXT_ID_ as decimal(12,0)),cast(deflate._IS_DELETED_ 
> as boolean))
>         """
>     spark.sql(merge_sql) {code}
>  
>  
> Results in the exception being thrown:
>  
> {code:java}
> /09/07 18:47:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
> MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from 
> file:///tmp/RESULTDATA_mor
> 22/09/07 18:47:12 INFO HoodieActiveTimeline: Loaded instants upto : 
> Option{val=[20220907150126010__deltacommit__COMPLETED]}
> Traceback (most recent call last):
>   File "/Users/parunkarthick/cdc-poc/main.py", line 971, in <module>
>     process_table(deflate_df, tableName, table_cols[tableNames[0]], 
> concurrent_write_enabled, delete_insert_enabled)
>   File "/Users/parunkarthick/cdc-poc/main.py", line 767, in process_table
>     merge_into_hudi(table_name, df, table_cols)
>   File "/Users/parunkarthick/cdc-poc/main.py", line 599, in merge_into_hudi
>     target_rows = spark.sql(sql)
>   File 
> "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/session.py",
>  line 723, in sql
>   File 
> "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>  line 1304, in __call__
>   File 
> "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py",
>  line 117, in deco
> pyspark.sql.utils.AnalysisException: Invalidate Merge-On condition: 
> (CAST(target.`id` AS DECIMAL(20,0)) = CAST(CAST(deflate.`id` AS 
> DECIMAL(20,0)) AS DECIMAL(20,0))).The validate condition should be 
> 'targetColumn = sourceColumnExpression', e.g. t.id = s.id and t.dt = 
> from_unixtime(s.ts) {code}
>  
> This occurs due to the fact that current impl of 
> {{MergeIntoHoodieTableCommand}} restricts target table's primary key to be 
> just an {{{}AttributeReference{}}}, which in this case is wrapped into a 
> {{Cast}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to