SudhirSaxena opened a new issue, #10587: URL: https://github.com/apache/hudi/issues/10587
Hi, I am facing issues for upsert mode in hudi 0.14 RLI in EMR 6.15 Spark 3.4.1 using "Record level Index". i see insert mode working as expected but upsert mode is not working with existing hudi table(lower version) and having several issues. In short, I am getting error for hoodieException config conflict(key current value existing value) Hoodie.database.name : database name I am giving the same database name(hudi db) where data is stored as target and same db providing in source data ( hudi config) and trying to upsert but throwing this hoodie.database.name issues. Can you please have a look and help me to fix this issues. https://apache-hudi.slack.com/files/U06FTGAHNUD/F06FXBENRFU/img_4074.jpg https://apache-hudi.slack.com/files/U06FTGAHNUD/F06FZQRTT2Q/img_4073.jpg below is code snippet :- hudi_config = { "className":"org.apache.hudi", "hoodie.table.name": tgt_tbl, "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.precombine.field": "eff_fm_cent_tz", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.partitionpath.field": "year,month", "hoodie.datasource.hive_sync.support_timestamp": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.assume_date_partitioning": "false", "hoodie.datasource.hive_sync.table": tgt_tbl, "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.upsert.shuffle.parallelism": hudi_upsert_parallelism, "hoodie.delete.shuffle.parallelism": hudi_delete_parallelism, "hoodie.bulkinsert.sort.mode": "GLOBAL_SORT", "hoodie.metadata.enable": "true", "hoodie.metadata.record.index.enable": "true", "hoodie.index.type": "RECORD_INDEX", "hoodie.metadata.index.column.stats.column.list": "res_sys_id,pnr_rec_loc_id,pnr_cre_dt", "hoodie.enable.data.skipping": "true" } hudi_delete_config = { "hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.EmptyHoodieRecordPayload" } # Initialize local s3 if (files_exist == True): print("upsert started") hudi_operation = "upsert" hudi_write_mode = "append" hudi_config["className"] = "org.apache.hudi" hudi_config["hoodie.table.keygenerator.class"] = "org.apache.hudi.keygen.ComplexKeyGenerator" hudi_config["hoodie.database.name"] = tgt_db hudi_config["hoodie.table.name"] = tgt_tbl hudi_config["hoodie.datasource.write.recordkey.field"] = "id" hudi_config["hoodie.datasource.write.operation"] = hudi_operation hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db hudi_config["hoodie.index.type"] = "RECORD_INDEX" hudi_config["hoodie.metadata.enable"] = "true" hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl hudi_config["hoodie.datasource.write.precombine.field"] = "eff_fm_cent_tz" hudi_config["hoodie.metadata.record.index.enable"] = "true" print("Upserting records into " + tgt_tbl + " Hudi table") res_pnr_concat.write.format("org.apache.hudi").options(**hudi_config).mode("Append").save(hudi_tbl_path + tgt_tbl) print("Successfully process records for " + tgt_tbl + " Hudi table") print("res_pnr hudi loading for upsert ended ----",datetime.now()) if len(target_records) > 0: print("delete logic started ----",datetime.now()) res_pnr_concat.createOrReplaceTempView("res_pnr_concat") res_pnr_del_df=spark.sql("select distinct A.* from res_pnr_cached A where not exists(select 1 from res_pnr_concat B where A.RES_SYS_ID = B.RES_SYS_ID AND A.PNR_REC_LOC_ID = B.PNR_REC_LOC_ID AND A.PNR_CRE_DT = B.PNR_CRE_DT AND A.EFF_FM_CENT_TZ = B.eff_fm_cent_tz and A.year = B.year and A.month = B.month)").drop("src_hoodie_record_key").drop("_hoodie_commit_seqno").drop("_hoodie_commit_time").drop("_hoodie_file_name").drop("_hoodie_partition_path").drop("_hoodie_record_key") if len(res_pnr_del_df.take(1)) > 0: common_config = {**hudi_config, **hudi_delete_config} spark.sql("uncache table if exists res_pnr_src_df") print("Deleting records from RES_PNR Hudi table") res_pnr_del_df.write.format("org.apache.hudi").options(**common_config).mode("append").save(hudi_tbl_path + tgt_tbl) else: print("Delete eligible records are NOT identified") print("delete logic Ended ----",datetime.now()) else: pass print("res_pnr hudi upsert logic ended ----",datetime.now()) else: hudi_operation = "bulk_insert" hudi_write_mode = "overwrite" hudi_config["className"] = "org.apache.hudi" hudi_config["hoodie.table.name"] = tgt_tbl hudi_config["hoodie.database.name"] = tgt_db hudi_config["hoodie.datasource.write.operation"] = hudi_operation hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl hudi_config["hoodie.datasource.write.recordkey.field"] = "id" hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db hudi_config["hoodie.index.type"] = "RECORD_INDEX" hudi_config["hoodie.metadata.enable"] = "true" hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl hudi_config["hoodie.datasource.write.precombine.field"] = "eff_fm_cent_tz" hudi_config["hoodie.metadata.record.index.enable"] = "true" print("bulk insert " + tgt_tbl + " Hudi table") res_pnr_concat.write.format("org.apache.hudi").options(**hudi_config).mode(hudi_write_mode).save(hudi_tbl_path + tgt_tbl) print("Successfully bulk upserted records into " + tgt_tbl + " Hudi table") print("res_pnr hudi loading for upsert ended ----",datetime.now()) **Upsert mode config:** hudi_operation = "upsert" hudi_write_mode = "append" hudi_config["className"] = "org.apache.hudi" hudi_config["hoodie.table.keygenerator.class"] = "org.apache.hudi.keygen.ComplexKeyGenerator" hudi_config["hoodie.table.name"] = tgt_tbl hudi_config["hoodie.database.name"] = tgt_db hudi_config["hoodie.datasource.write.recordkey.field"] = "id" hudi_config["hoodie.datasource.write.operation"] = hudi_operation hudi_config["hoodie.datasource.hive_sync.table"] = tgt_tbl hudi_config["hoodie.datasource.hive_sync.database"] = tgt_db hudi_config["hoodie.index.type"] = "RECORD_INDEX" hudi_config["hoodie.datasource.write.table.name"] = tgt_tbl hudi_config["hoodie.datasource.write.precombine.field"] = "eff_fm_cent_tz" hudi_config["hoodie.metadata.record.index.enable"] = "true" Upsert method job is throwing an error: An error was encountered: An error occurred while calling o2149.save. : org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value): hoodie.database.name: datalake_dev1_entp_cds at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:211) at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:177) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Traceback (most recent call last): File "/mnt1/yarn/usercache/livy/appcache/application_1706286757454_0017/container_1706286757454_0017_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 1398, in save self._jwrite.save(path) File "/mnt1/yarn/usercache/livy/appcache/application_1706286757454_0017/container_1706286757454_0017_01_000001/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1323, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/mnt1/yarn/usercache/livy/appcache/application_1706286757454_0017/container_1706286757454_0017_01_000001/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco return f(*a, **kw) File "/mnt1/yarn/usercache/livy/appcache/application_1706286757454_0017/container_1706286757454_0017_01_000001/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o2149.save. : org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value): hoodie.database.name: datalake_dev1_entp_cds at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:211) at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:177) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
