piyushrl opened a new issue #1985:
URL: https://github.com/apache/hudi/issues/1985
We have backfilled data using df.write.format(“hudi”) from db and then in
same location and hive table trying to upsert new data using deltastreamer from
kafka. We are facing below error when deltastreamer tries to upsert new data
once backfill is done.
`20/08/19 15:21:31 ERROR deltastreamer.HoodieDeltaStreamer: Got error
running delta sync once. Shutting down
org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to
find previous checkpoint. Please double check if this table was indeed built
via delta streamer. Last Commit
:Option{val=[20200819143204__deltacommit__COMPLETED]}, Instants
:[[20200819143204__deltacommit__COMPLETED]], CommitMetadata={
"partitionToWriteStats" : {
"2019-06-16" : [ {
"fileId" : "da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0",
"path" :
"2019-06-16/da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0_0-20-22_20200819143204.parquet",
"prevCommit" : "null",
"numWrites" : 6,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 6,
"totalWriteBytes" : 461439,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "2019-06-16",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 461439
} ],
"2019-06-17" : [ {
"fileId" : "423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0",
"path" :
"2019-06-17/423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0_1-20-23_20200819143204.parquet",
"prevCommit" : "null",
"numWrites" : 2251,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 2251,
"totalWriteBytes" : 787516,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "2019-06-17",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 787516
} ]
},
"compacted" : false,
"extraMetadata" : {
"ROLLING_STAT" : "{\n \"partitionToRollingStats\" : {\n
\"2019-06-16\" : {\n \"da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0\" : {\n
\"fileId\" : \"da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0\",\n \"inserts\"
: 6,\n \"upserts\" : 0,\n \"deletes\" : 0,\n
\"totalInputWriteBytesToDisk\" : 0,\n \"totalInputWriteBytesOnDisk\" :
461439\n }\n },\n \"2019-06-17\" : {\n
\"423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0\" : {\n \"fileId\" :
\"423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0\",\n \"inserts\" : 2251,\n
\"upserts\" : 0,\n \"deletes\" : 0,\n
\"totalInputWriteBytesToDisk\" : 0,\n \"totalInputWriteBytesOnDisk\" :
787516\n }\n }\n },\n \"actionType\" : \"deltacommit\"\n}",
"schema" : "<SCHEMA>"
},
"operationType" : "UPSERT",
"fileIdAndRelativePaths" : {
"da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0" :
"2019-06-16/da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0_0-20-22_20200819143204.parquet",
"423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0" :
"2019-06-17/423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0_1-20-23_20200819143204.parquet"
},
"totalRecordsDeleted" : 0,
"totalLogRecordsCompacted" : 0,
"totalScanTime" : 0,
"totalCreateTime" : 0,
"totalUpsertTime" : 0,
"totalCompactedRecordsUpdated" : 0,
"totalLogFilesCompacted" : 0,
"totalLogFilesSize" : 0
}
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:268)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:226)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)`
Since Autocommit is false by default in hudi-spark method using
df.write.format(“hudi”), I tried changing the withAutoCommit(false) to
withAutoCommit(true) in DataSourceUtil.java creating the HoodieWriteConfig
object, but then in backfill job we got FileAlreadyExistsException for
.deltacommit file. I tired this in a new directory and still got this issue.
`20/08/19 11:29:07 ERROR yarn.ApplicationMaster: User class threw exception:
org.apache.hudi.exception.HoodieIOException: Failed to create file
hdfs:/user/hive/warehouse/test.db/tbl_test/.hoodie/20200819112849.deltacommit
org.apache.hudi.exception.HoodieIOException: Failed to create file
hdfs:/user/hive/warehouse/test.db/tbl_test/.hoodie/20200819112849.deltacommit
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:437)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:327)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:145)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:165)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:111)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:102)
at
org.apache.hudi.HoodieSparkSqlWriter$.checkWriteStatus(HoodieSparkSqlWriter.scala:259)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:181)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:108)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
at org.test.dp.connectors.HudiSink$.writeToHudi(HudiSink.scala:34)
at
org.test.dp.HistoryBackfillSparkJob$.main(HistoryBackfillSparkJob.scala:47)
at org.test.dp.HistoryBackfillSparkJob.main(HistoryBackfillSparkJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException:
/user/hive/warehouse/test.db/tbl_test/.hoodie/20200819112849.deltacommit for
client <ip> already exists
at
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:388)`
Is there any way that once i backfill data using hudi-spark, in that same
hdfs location and hive table, deltastreamer can read metadata or something and
start daily upsert from kafka
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]