arunb2w opened a new issue, #6626:
URL: https://github.com/apache/hudi/issues/6626
Getting error pyspark.sql.utils.AnalysisException: Invalidate Merge-On
condition: when running the below code
Hudi Version: 0.10
Spark version: 3.1.2
**Sample code**
```
target_df = spark.read.format("hudi").load(basePath)
print("###################################")
print(target_df.printSchema())
# # target_df.show()
target_datatype_map = {}
for name, dtype in target_df.dtypes:
target_datatype_map[name] = dtype
print(str(target_datatype_map))
print("###################################")
for col in columns:
if has_column(deflateDf, col):
deflateDf = deflateDf.withColumn(col, F.col(col))
else:
deflateDf = deflateDf.withColumn(col, F.lit(None))
deflateDf.createOrReplaceTempView("deflate_table")
create_sql = "create table RESULTDATA using hudi location
'/tmp/RESULTDATA_mor'"
spark.sql(create_sql)
merge_sql = """
merge into RESULTDATA as target
using (
select * from deflate_table as deflate
)
on target._context_id_ = deflate._context_id_ and target.id =
deflate.id
when matched
then update set
target.CREATED = cast(if(array_contains(deflate.changed_cols,
'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY =
cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY,
target.CREATEDBY) as string),target.DELETED =
cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED,
target.DELETED) as timestamp),target.DELETEDBY =
cast(if(array_contains(deflate.changed_cols, 'DELETEDBY'), deflate.DELETEDBY,
target.DELETEDBY) as string),target.EXPIRATIONDATE =
cast(if(array_contains(deflate.changed_cols, 'EXPIRATIONDATE'),
deflate.EXPIRATIONDATE, target.EXPIRATIONDATE) as timestamp),target.ID =
cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as
decimal(12,0)),target.KEY = cast(if(array_contains(deflate.changed_cols,
'KEY'), deflate.KEY, target.KEY) as string),target.LASTMODIFIED =
cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'),
deflate.LASTMODIFIED, target.LASTMODIFIED) as timesta
mp),target.LASTMODIFIEDBY = cast(if(array_contains(deflate.changed_cols,
'LASTMODIFIEDBY'), deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as
string),target.ORDERING = cast(if(array_contains(deflate.changed_cols,
'ORDERING'), deflate.ORDERING, target.ORDERING) as
decimal(12,0)),target.RESULTID = cast(if(array_contains(deflate.changed_cols,
'RESULTID'), deflate.RESULTID, target.RESULTID) as
decimal(12,0)),target.REPORTINGPERIODTYPE =
cast(if(array_contains(deflate.changed_cols, 'REPORTINGPERIODTYPE'),
deflate.REPORTINGPERIODTYPE, target.REPORTINGPERIODTYPE) as
string),target.RESULTDATE = cast(if(array_contains(deflate.changed_cols,
'RESULTDATE'), deflate.RESULTDATE, target.RESULTDATE) as
timestamp),target.SATISFYINGNUMERATOR =
cast(if(array_contains(deflate.changed_cols, 'SATISFYINGNUMERATOR'),
deflate.SATISFYINGNUMERATOR, target.SATISFYINGNUMERATOR) as
decimal(12,0)),target.VALUE = cast(if(array_contains(deflate.changed_cols,
'VALUE'), deflate.VALUE, target.VALUE) as string),target.
_ETL_RUN_ID_ = cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'),
deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as
decimal(38,0)),target._ETL_MODIFIED_ =
cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'),
deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as timestamp),target._EXTRACTED_
= cast(if(array_contains(deflate.changed_cols, '_EXTRACTED_'),
deflate._EXTRACTED_, target._EXTRACTED_) as
timestamp),target._SOURCE_EXTRACTED_ =
cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'),
deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as
timestamp),target._LAST_MODIFIED_SEQ_ =
cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'),
deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as
decimal(38,0)),target._SCHEMA_CLASS_ =
cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'),
deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ =
cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'), defla
te._CONTEXT_ID_, target._CONTEXT_ID_) as decimal(12,0)),target._IS_DELETED_ =
cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'),
deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
when not matched
then insert
(CREATED,CREATEDBY,DELETED,DELETEDBY,EXPIRATIONDATE,ID,KEY,LASTMODIFIED,LASTMODIFIEDBY,ORDERING,RESULTID,REPORTINGPERIODTYPE,RESULTDATE,SATISFYINGNUMERATOR,VALUE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_)
values (cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as
string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as
string),cast(deflate.EXPIRATIONDATE as timestamp),cast(deflate.ID as
decimal(12,0)),cast(deflate.KEY as string),cast(deflate.LASTMODIFIED as
timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.ORDERING as
decimal(12,0)),cast(deflate.RESULTID as
decimal(12,0)),cast(deflate.REPORTINGPERIODTYPE as
string),cast(deflate.RESULTDATE as timestamp),cast(deflate.SATISFYINGNUMERATOR
as decimal(12,0)),cast(deflate.VALUE as string),cast(deflate._ETL_RUN_ID_ as
decimal(38,0)),cast(deflate._ETL_MODIFIED_ as
timestamp),cast(deflate._EXTRACTED_ as timestamp),cast(de
flate._SOURCE_EXTRACTED_ as timestamp),cast(deflate._LAST_MODIFIED_SEQ_ as
decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as string),cast(deflate._CONTEXT_ID_
as decimal(12,0)),cast(deflate._IS_DELETED_ as boolean))
"""
spark.sql(merge_sql)
```
When running the above sample code using below command getting the error
```
spark-submit --master local --packages
org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2
--jars
file:///Users/parunkarthick/Downloads/spark-snowflake_2.12-2.10.0-spark_3.1.jar,file:///Users/parunkarthick/Downloads/snowflake-jdbc-3.13.14.jar
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
--driver-memory 1g --executor-memory 1g main.py
```
**Error stackstrace**
`22/09/07 18:47:12 INFO HoodieTableMetaClient: Finished Loading Table of
type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from
file:///tmp/RESULTDATA_mor
22/09/07 18:47:12 INFO HoodieActiveTimeline: Loaded instants upto :
Option{val=[20220907150126010__deltacommit__COMPLETED]}
Traceback (most recent call last):
File "/Users/parunkarthick/cdc-poc/main.py", line 971, in <module>
process_table(deflate_df, tableName, table_cols[tableNames[0]],
concurrent_write_enabled, delete_insert_enabled)
File "/Users/parunkarthick/cdc-poc/main.py", line 767, in process_table
merge_into_hudi(table_name, df, table_cols)
File "/Users/parunkarthick/cdc-poc/main.py", line 599, in merge_into_hudi
target_rows = spark.sql(sql)
File
"/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/session.py",
line 723, in sql
File
"/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1304, in __call__
File
"/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 117, in deco
pyspark.sql.utils.AnalysisException: Invalidate Merge-On condition:
(CAST(target.`id` AS DECIMAL(20,0)) = CAST(CAST(deflate.`id` AS DECIMAL(20,0))
AS DECIMAL(20,0))).The validate condition should be 'targetColumn =
sourceColumnExpression', e.g. t.id = s.id and t.dt = from_unixtime(s.ts)`
I would really appreciate if anyone can help with this issue, or point me in
the right direction if in case I've missed anything.
Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]