[
https://issues.apache.org/jira/browse/HUDI-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Raymond Xu updated HUDI-4861:
-----------------------------
Sprint: 2022/09/19
> Relax MERGE INTO restrictions to permit casting of the matching condition
> -------------------------------------------------------------------------
>
> Key: HUDI-4861
> URL: https://issues.apache.org/jira/browse/HUDI-4861
> Project: Apache Hudi
> Issue Type: Bug
> Components: spark-sql
> Affects Versions: 0.12.0
> Reporter: Alexey Kudinkin
> Assignee: Alexey Kudinkin
> Priority: Critical
> Fix For: 0.12.1
>
>
> Reported by user:
> [https://github.com/apache/hudi/issues/6626]
>
> Following code
>
> {code:java}
> target_df = spark.read.format("hudi").load(basePath)
> print("###################################")
> print(target_df.printSchema())
> # # target_df.show()
> target_datatype_map = {}
> for name, dtype in target_df.dtypes:
> target_datatype_map[name] = dtype
> print(str(target_datatype_map))
> print("###################################")
> for col in columns:
> if has_column(deflateDf, col):
> deflateDf = deflateDf.withColumn(col, F.col(col))
> else:
> deflateDf = deflateDf.withColumn(col, F.lit(None))
> deflateDf.createOrReplaceTempView("deflate_table")
> create_sql = "create table RESULTDATA using hudi location
> '/tmp/RESULTDATA_mor'"
> spark.sql(create_sql)
>
> merge_sql = """
> merge into RESULTDATA as target
> using (
> select * from deflate_table as deflate
> )
> on target._context_id_ = deflate._context_id_ and target.id =
> deflate.id
> when matched
> then update set
> target.CREATED = cast(if(array_contains(deflate.changed_cols,
> 'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY =
> cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY,
> target.CREATEDBY) as string),target.DELETED =
> cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED,
> target.DELETED) as timestamp),target.DELETEDBY =
> cast(if(array_contains(deflate.changed_cols, 'DELETEDBY'), deflate.DELETEDBY,
> target.DELETEDBY) as string),target.EXPIRATIONDATE =
> cast(if(array_contains(deflate.changed_cols, 'EXPIRATIONDATE'),
> deflate.EXPIRATIONDATE, target.EXPIRATIONDATE) as timestamp),target.ID =
> cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as
> decimal(12,0)),target.KEY = cast(if(array_contains(deflate.changed_cols,
> 'KEY'), deflate.KEY, target.KEY) as string),target.LASTMODIFIED =
> cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'),
> deflate.LASTMODIFIED, target.LASTMODIFIED) as
> timestamp),target.LASTMODIFIEDBY =
> cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'),
> deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.ORDERING =
> cast(if(array_contains(deflate.changed_cols, 'ORDERING'), deflate.ORDERING,
> target.ORDERING) as decimal(12,0)),target.RESULTID =
> cast(if(array_contains(deflate.changed_cols, 'RESULTID'), deflate.RESULTID,
> target.RESULTID) as decimal(12,0)),target.REPORTINGPERIODTYPE =
> cast(if(array_contains(deflate.changed_cols, 'REPORTINGPERIODTYPE'),
> deflate.REPORTINGPERIODTYPE, target.REPORTINGPERIODTYPE) as
> string),target.RESULTDATE = cast(if(array_contains(deflate.changed_cols,
> 'RESULTDATE'), deflate.RESULTDATE, target.RESULTDATE) as
> timestamp),target.SATISFYINGNUMERATOR =
> cast(if(array_contains(deflate.changed_cols, 'SATISFYINGNUMERATOR'),
> deflate.SATISFYINGNUMERATOR, target.SATISFYINGNUMERATOR) as
> decimal(12,0)),target.VALUE = cast(if(array_contains(deflate.changed_cols,
> 'VALUE'), deflate.VALUE, target.VALUE) as string),target._ETL_RUN_ID_ =
> cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'),
> deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as
> decimal(38,0)),target._ETL_MODIFIED_ =
> cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'),
> deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as
> timestamp),target._EXTRACTED_ = cast(if(array_contains(deflate.changed_cols,
> '_EXTRACTED_'), deflate._EXTRACTED_, target._EXTRACTED_) as
> timestamp),target._SOURCE_EXTRACTED_ =
> cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'),
> deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as
> timestamp),target._LAST_MODIFIED_SEQ_ =
> cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'),
> deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as
> decimal(38,0)),target._SCHEMA_CLASS_ =
> cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'),
> deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_
> = cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'),
> deflate._CONTEXT_ID_, target._CONTEXT_ID_) as
> decimal(12,0)),target._IS_DELETED_ =
> cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'),
> deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
> when not matched
> then insert
>
> (CREATED,CREATEDBY,DELETED,DELETEDBY,EXPIRATIONDATE,ID,KEY,LASTMODIFIED,LASTMODIFIEDBY,ORDERING,RESULTID,REPORTINGPERIODTYPE,RESULTDATE,SATISFYINGNUMERATOR,VALUE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_)
> values (cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as
> string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as
> string),cast(deflate.EXPIRATIONDATE as timestamp),cast(deflate.ID as
> decimal(12,0)),cast(deflate.KEY as string),cast(deflate.LASTMODIFIED as
> timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.ORDERING as
> decimal(12,0)),cast(deflate.RESULTID as
> decimal(12,0)),cast(deflate.REPORTINGPERIODTYPE as
> string),cast(deflate.RESULTDATE as
> timestamp),cast(deflate.SATISFYINGNUMERATOR as
> decimal(12,0)),cast(deflate.VALUE as string),cast(deflate._ETL_RUN_ID_ as
> decimal(38,0)),cast(deflate._ETL_MODIFIED_ as
> timestamp),cast(deflate._EXTRACTED_ as
> timestamp),cast(deflate._SOURCE_EXTRACTED_ as
> timestamp),cast(deflate._LAST_MODIFIED_SEQ_ as
> decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as
> string),cast(deflate._CONTEXT_ID_ as decimal(12,0)),cast(deflate._IS_DELETED_
> as boolean))
> """
> spark.sql(merge_sql) {code}
>
>
> Results in the exception being thrown:
>
> {code:java}
> /09/07 18:47:12 INFO HoodieTableMetaClient: Finished Loading Table of type
> MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from
> file:///tmp/RESULTDATA_mor
> 22/09/07 18:47:12 INFO HoodieActiveTimeline: Loaded instants upto :
> Option{val=[20220907150126010__deltacommit__COMPLETED]}
> Traceback (most recent call last):
> File "/Users/parunkarthick/cdc-poc/main.py", line 971, in <module>
> process_table(deflate_df, tableName, table_cols[tableNames[0]],
> concurrent_write_enabled, delete_insert_enabled)
> File "/Users/parunkarthick/cdc-poc/main.py", line 767, in process_table
> merge_into_hudi(table_name, df, table_cols)
> File "/Users/parunkarthick/cdc-poc/main.py", line 599, in merge_into_hudi
> target_rows = spark.sql(sql)
> File
> "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/session.py",
> line 723, in sql
> File
> "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__
> File
> "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 117, in deco
> pyspark.sql.utils.AnalysisException: Invalidate Merge-On condition:
> (CAST(target.`id` AS DECIMAL(20,0)) = CAST(CAST(deflate.`id` AS
> DECIMAL(20,0)) AS DECIMAL(20,0))).The validate condition should be
> 'targetColumn = sourceColumnExpression', e.g. t.id = s.id and t.dt =
> from_unixtime(s.ts) {code}
>
> This occurs due to the fact that current impl of
> {{MergeIntoHoodieTableCommand}} restricts target table's primary key to be
> just an {{{}AttributeReference{}}}, which in this case is wrapped into a
> {{Cast}}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)