SudhirSaxena opened a new issue, #10587:
URL: https://github.com/apache/hudi/issues/10587

   Hi,
   I am facing issues for upsert mode in hudi 0.14 RLI in EMR 6.15 Spark 3.4.1 
using "Record level Index".
   i see insert mode working as expected but upsert mode is not working with 
existing hudi table(lower version) and having several issues.  In short, I am 
getting error for hoodieException config conflict(key current value  existing 
value)
   Hoodie.database.name : database name
   I am giving the same database name(hudi db)  where data is stored as target 
and same db providing in source data ( hudi config) and trying to upsert but 
throwing this hoodie.database.name issues.
   Can you please have a look and help me to fix this issues.
   
   https://apache-hudi.slack.com/files/U06FTGAHNUD/F06FXBENRFU/img_4074.jpg
   https://apache-hudi.slack.com/files/U06FTGAHNUD/F06FZQRTT2Q/img_4073.jpg
   
   below is code snippet :- 
   
   hudi_config = {
               "className":"org.apache.hudi",
               "hoodie.table.name": tgt_tbl,
               "hoodie.datasource.write.recordkey.field": "id",
               "hoodie.datasource.write.precombine.field": "eff_fm_cent_tz",
               "hoodie.datasource.write.operation": "upsert",
               "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
               "hoodie.datasource.write.partitionpath.field": "year,month",
               "hoodie.datasource.hive_sync.support_timestamp": "true",
               "hoodie.datasource.hive_sync.enable": "true",
               "hoodie.datasource.hive_sync.assume_date_partitioning": "false",
               "hoodie.datasource.hive_sync.table": tgt_tbl,
               "hoodie.datasource.hive_sync.use_jdbc": "false",
               "hoodie.datasource.hive_sync.mode": "hms",
               "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
               "hoodie.datasource.write.hive_style_partitioning": "true",
               "hoodie.upsert.shuffle.parallelism": hudi_upsert_parallelism,
               "hoodie.delete.shuffle.parallelism": hudi_delete_parallelism,
               "hoodie.bulkinsert.sort.mode": "GLOBAL_SORT",
               "hoodie.metadata.enable": "true",
               "hoodie.metadata.record.index.enable": "true",
               "hoodie.index.type": "RECORD_INDEX",
               "hoodie.metadata.index.column.stats.column.list": 
"res_sys_id,pnr_rec_loc_id,pnr_cre_dt",
               "hoodie.enable.data.skipping": "true"
           }
           hudi_delete_config = {
               "hoodie.datasource.write.payload.class":
                   "org.apache.hudi.common.model.EmptyHoodieRecordPayload"
           }
           # Initialize local s3
           if (files_exist == True):
               print("upsert started")
               hudi_operation = "upsert"
               hudi_write_mode = "append"
               hudi_config["className"] = "org.apache.hudi"
               hudi_config["hoodie.table.keygenerator.class"] = 
"org.apache.hudi.keygen.ComplexKeyGenerator"
               hudi_config["hoodie.database.name"] = tgt_db
               hudi_config["hoodie.table.name"] = tgt_tbl
               hudi_config["hoodie.datasource.write.recordkey.field"] = "id"
               hudi_config["hoodie.datasource.write.operation"] = hudi_operation
               hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl
               hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db
               hudi_config["hoodie.index.type"] = "RECORD_INDEX"
               hudi_config["hoodie.metadata.enable"] = "true"
               hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl
               hudi_config["hoodie.datasource.write.precombine.field"] = 
"eff_fm_cent_tz"
               hudi_config["hoodie.metadata.record.index.enable"] = "true"
               print("Upserting records into " + tgt_tbl + " Hudi table")
               
res_pnr_concat.write.format("org.apache.hudi").options(**hudi_config).mode("Append").save(hudi_tbl_path
 + tgt_tbl)
               print("Successfully process records for " + tgt_tbl + " Hudi 
table")
               print("res_pnr hudi loading for upsert ended 
----",datetime.now())
               if len(target_records) > 0:
                   print("delete logic started ----",datetime.now())
                   res_pnr_concat.createOrReplaceTempView("res_pnr_concat")
                   res_pnr_del_df=spark.sql("select distinct A.* from 
res_pnr_cached A where not exists(select 1 from res_pnr_concat B where 
A.RES_SYS_ID = B.RES_SYS_ID AND A.PNR_REC_LOC_ID = B.PNR_REC_LOC_ID AND 
A.PNR_CRE_DT = B.PNR_CRE_DT AND A.EFF_FM_CENT_TZ = B.eff_fm_cent_tz and A.year 
= B.year and A.month = 
B.month)").drop("src_hoodie_record_key").drop("_hoodie_commit_seqno").drop("_hoodie_commit_time").drop("_hoodie_file_name").drop("_hoodie_partition_path").drop("_hoodie_record_key")
                   if len(res_pnr_del_df.take(1)) > 0:
                       common_config = {**hudi_config, **hudi_delete_config}
                       spark.sql("uncache table if exists res_pnr_src_df")
                       print("Deleting records from RES_PNR Hudi table")
                       
res_pnr_del_df.write.format("org.apache.hudi").options(**common_config).mode("append").save(hudi_tbl_path
 + tgt_tbl)
                   else:
                       print("Delete eligible records are NOT identified")
                   print("delete logic Ended ----",datetime.now())
               else:
                   pass
               print("res_pnr hudi upsert logic ended ----",datetime.now())
           else:
               hudi_operation = "bulk_insert"
               hudi_write_mode = "overwrite"
               hudi_config["className"] = "org.apache.hudi"
               hudi_config["hoodie.table.name"] = tgt_tbl
               hudi_config["hoodie.database.name"] = tgt_db
               hudi_config["hoodie.datasource.write.operation"] = hudi_operation
               hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl
               hudi_config["hoodie.datasource.write.recordkey.field"] = "id"
               hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db
               hudi_config["hoodie.index.type"] = "RECORD_INDEX"
               hudi_config["hoodie.metadata.enable"] = "true"
               hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl
               hudi_config["hoodie.datasource.write.precombine.field"] = 
"eff_fm_cent_tz"
               hudi_config["hoodie.metadata.record.index.enable"] = "true"
               print("bulk insert " + tgt_tbl + " Hudi table")
               
res_pnr_concat.write.format("org.apache.hudi").options(**hudi_config).mode(hudi_write_mode).save(hudi_tbl_path
 + tgt_tbl)
               print("Successfully bulk upserted records into " + tgt_tbl + " 
Hudi table")
           print("res_pnr hudi loading for upsert ended ----",datetime.now())
   
   **Upsert mode config:**
   
   hudi_operation = "upsert"
   hudi_write_mode = "append"
   hudi_config["className"] = "org.apache.hudi"
   hudi_config["hoodie.table.keygenerator.class"] = 
"org.apache.hudi.keygen.ComplexKeyGenerator"
   hudi_config["hoodie.table.name"] = tgt_tbl
   hudi_config["hoodie.database.name"] = tgt_db
   hudi_config["hoodie.datasource.write.recordkey.field"] = "id"
   hudi_config["hoodie.datasource.write.operation"] = hudi_operation
   hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl
   hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db
   hudi_config["hoodie.index.type"] = "RECORD_INDEX"
   hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl
   hudi_config["hoodie.datasource.write.precombine.field"] = "eff_fm_cent_tz"
   hudi_config["hoodie.metadata.record.index.enable"] = "true"
   
   Upsert method job is throwing an error:
   
   An error was encountered:
   An error occurred while calling o2149.save.
   : org.apache.hudi.exception.HoodieException: Config conflict(key     current 
value   existing value):
   hoodie.database.name:        datalake_dev1_entp_cds
        at 
org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:211)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:177)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
   
   Traceback (most recent call last):
     File 
"/mnt1/yarn/usercache/livy/appcache/application_1706286757454_0017/container_1706286757454_0017_01_000001/pyspark.zip/pyspark/sql/readwriter.py",
 line 1398, in save
       self._jwrite.save(path)
     File 
"/mnt1/yarn/usercache/livy/appcache/application_1706286757454_0017/container_1706286757454_0017_01_000001/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
 line 1323, in __call__
       answer, self.gateway_client, self.target_id, self.name)
     File 
"/mnt1/yarn/usercache/livy/appcache/application_1706286757454_0017/container_1706286757454_0017_01_000001/pyspark.zip/pyspark/errors/exceptions/captured.py",
 line 169, in deco
       return f(*a, **kw)
     File 
"/mnt1/yarn/usercache/livy/appcache/application_1706286757454_0017/container_1706286757454_0017_01_000001/py4j-0.10.9.7-src.zip/py4j/protocol.py",
 line 328, in get_return_value
       format(target_id, ".", name), value)
   py4j.protocol.Py4JJavaError: An error occurred while calling o2149.save.
   : org.apache.hudi.exception.HoodieException: Config conflict(key     current 
value   existing value):
   hoodie.database.name:        datalake_dev1_entp_cds
        at 
org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:211)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:177)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to