piyushrl opened a new issue #1985:
URL: https://github.com/apache/hudi/issues/1985


   We have backfilled data using df.write.format(“hudi”) from db and then in 
same location and hive table trying to upsert new data using deltastreamer from 
kafka. We are facing below error when deltastreamer tries to upsert new data 
once backfill is done.
   
   `20/08/19 15:21:31 ERROR deltastreamer.HoodieDeltaStreamer: Got error 
running delta sync once. Shutting down
   org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to 
find previous checkpoint. Please double check if this table was indeed built 
via delta streamer. Last Commit 
:Option{val=[20200819143204__deltacommit__COMPLETED]}, Instants 
:[[20200819143204__deltacommit__COMPLETED]], CommitMetadata={
     "partitionToWriteStats" : {
         "2019-06-16" : [ {
         "fileId" : "da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0",
         "path" : 
"2019-06-16/da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0_0-20-22_20200819143204.parquet",
         "prevCommit" : "null",
         "numWrites" : 6,
         "numDeletes" : 0,
         "numUpdateWrites" : 0,
         "numInserts" : 6,
         "totalWriteBytes" : 461439,
         "totalWriteErrors" : 0,
         "tempPath" : null,
         "partitionPath" : "2019-06-16",
         "totalLogRecords" : 0,
         "totalLogFilesCompacted" : 0,
         "totalLogSizeCompacted" : 0,
         "totalUpdatedRecordsCompacted" : 0,
         "totalLogBlocks" : 0,
         "totalCorruptLogBlock" : 0,
         "totalRollbackBlocks" : 0,
         "fileSizeInBytes" : 461439
       } ],
       "2019-06-17" : [ {
         "fileId" : "423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0",
         "path" : 
"2019-06-17/423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0_1-20-23_20200819143204.parquet",
         "prevCommit" : "null",
         "numWrites" : 2251,
         "numDeletes" : 0,
         "numUpdateWrites" : 0,
         "numInserts" : 2251,
         "totalWriteBytes" : 787516,
         "totalWriteErrors" : 0,
         "tempPath" : null,
         "partitionPath" : "2019-06-17",
         "totalLogRecords" : 0,
         "totalLogFilesCompacted" : 0,
         "totalLogSizeCompacted" : 0,
         "totalUpdatedRecordsCompacted" : 0,
         "totalLogBlocks" : 0,
         "totalCorruptLogBlock" : 0,
         "totalRollbackBlocks" : 0,
         "fileSizeInBytes" : 787516
       } ]
     },
     "compacted" : false,
     "extraMetadata" : {
       "ROLLING_STAT" : "{\n  \"partitionToRollingStats\" : {\n    
\"2019-06-16\" : {\n      \"da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0\" : {\n      
  \"fileId\" : \"da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0\",\n        \"inserts\" 
: 6,\n        \"upserts\" : 0,\n        \"deletes\" : 0,\n        
\"totalInputWriteBytesToDisk\" : 0,\n        \"totalInputWriteBytesOnDisk\" : 
461439\n      }\n    },\n    \"2019-06-17\" : {\n      
\"423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0\" : {\n        \"fileId\" : 
\"423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0\",\n        \"inserts\" : 2251,\n      
  \"upserts\" : 0,\n        \"deletes\" : 0,\n        
\"totalInputWriteBytesToDisk\" : 0,\n        \"totalInputWriteBytesOnDisk\" : 
787516\n      }\n    }\n  },\n  \"actionType\" : \"deltacommit\"\n}",
       "schema" : "<SCHEMA>"
     },
     "operationType" : "UPSERT",
     "fileIdAndRelativePaths" : {
       "da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0" : 
"2019-06-16/da81b7ed-296b-4c3a-b3fe-823ded3e2e5f-0_0-20-22_20200819143204.parquet",
       "423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0" : 
"2019-06-17/423f6ccc-2391-4565-bf2e-ef9fc1354e6d-0_1-20-23_20200819143204.parquet"
     },
     "totalRecordsDeleted" : 0,
     "totalLogRecordsCompacted" : 0,
     "totalScanTime" : 0,
     "totalCreateTime" : 0,
     "totalUpsertTime" : 0,
     "totalCompactedRecordsUpdated" : 0,
     "totalLogFilesCompacted" : 0,
     "totalLogFilesSize" : 0
   }
   at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:268)
   at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:226)
   at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)
   at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)`
   
   
   
   
   
   
   
   Since Autocommit is false by default in hudi-spark method using 
df.write.format(“hudi”), I tried changing the withAutoCommit(false) to 
withAutoCommit(true) in DataSourceUtil.java creating the HoodieWriteConfig 
object, but then in backfill job we got FileAlreadyExistsException for 
.deltacommit file. I tired this in a new directory and still got this issue.
   
   
   `20/08/19 11:29:07 ERROR yarn.ApplicationMaster: User class threw exception: 
org.apache.hudi.exception.HoodieIOException: Failed to create file 
hdfs:/user/hive/warehouse/test.db/tbl_test/.hoodie/20200819112849.deltacommit
   org.apache.hudi.exception.HoodieIOException: Failed to create file 
hdfs:/user/hive/warehouse/test.db/tbl_test/.hoodie/20200819112849.deltacommit
   at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:437)
   at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:327)
   at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:145)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:165)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:111)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:102)
   at 
org.apache.hudi.HoodieSparkSqlWriter$.checkWriteStatus(HoodieSparkSqlWriter.scala:259)
   at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:181)
   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:108)
   at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
   at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
   at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
   at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
   at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
   at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
   at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
   at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
   at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
   at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
   at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
   at org.test.dp.connectors.HudiSink$.writeToHudi(HudiSink.scala:34)
   at 
org.test.dp.HistoryBackfillSparkJob$.main(HistoryBackfillSparkJob.scala:47)
   at org.test.dp.HistoryBackfillSparkJob.main(HistoryBackfillSparkJob.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)
   Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/hive/warehouse/test.db/tbl_test/.hoodie/20200819112849.deltacommit for 
client <ip> already exists
   at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:388)`
   
   
   
   Is there any way that once i backfill data using hudi-spark, in that same 
hdfs location and hive table, deltastreamer can read metadata or something and 
start daily upsert from kafka
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to