shubham-bungee opened a new issue, #6389:
URL: https://github.com/apache/hudi/issues/6389
**ERROR WHILE LOADING INCREMENTAL DATA**
An error occurred while calling o605.save. Failed to upsert for commit time
20220813064526092
```
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o605.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for
commit time 20220812223251416
at
org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:63)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:119)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:160)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:217)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:277)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: T
ask 47 in stage 8.0 failed 4 times, most recent failure: Lost task 47.3 in
stage 8.0 (TID 1833)
(162.44.118.51 executor 7):
org.apache.hudi.exception.HoodieException: score,capture_date(Part
-score,capture_date)
field not found in record. Acceptable fields were :
[column1,column2, .................., score, capture_date ]__
```
**_score(double) and capture_date(timestamp, not null) Columns are present
in column list._**
But still facing issue.
**CONFIG USED ::**
commonConfig = {'className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
**'hoodie.datasource.write.precombine.field':
"score,capture_date", #### USING TWO COLUMNS ######**
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.table.name': 'sales',
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': 'sales',
'hoodie.datasource.hive_sync.table': 'sales',
'hoodie.datasource.hive_sync.enable': 'true',
'path':"s3://datawarehouse/DATA/DEV/gold/sales/",
'hoodie.index.type': 'GLOBAL_SIMPLE',
'hoodie.simple.index.update.partition.path': 'true',
'hoodie.global.simple.index.parallelism': '20'
}
partitionDataConfig = {
'hoodie.datasource.write.keygenerator.class' :
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.partitionpath.field': "country,zipcode",
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields': "country,zipcode",
'hoodie.datasource.write.hive_style_partitioning': 'true'
}
#USED FOR FIRST TIME BULK INSERT#
initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 20,
'hoodie.datasource.write.operation': 'bulk_insert'}
incrementalWriteConfig = {
'hoodie.upsert.shuffle.parallelism': 20,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': 5
}
upsertConf = {**commonConfig,
**partitionDataConfig,
**incrementalWriteConfig
}
df.write.format("org.apache.hudi").options(**upsertConf).mode('append').save()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]