[ 
https://issues.apache.org/jira/browse/HUDI-6814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-6814:
----------------------------
    Component/s: table-service

> Streaming ingestion using bulk_insert in GCP intermittantly fails w/ 412 
> pre-condition error
> --------------------------------------------------------------------------------------------
>
>                 Key: HUDI-6814
>                 URL: https://issues.apache.org/jira/browse/HUDI-6814
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: table-service, writer-core
>            Reporter: sivabalan narayanan
>            Priority: Major
>
> I have a streaming ingestion (consuming from streaming source and writing to 
> hudi using forEachBatch) using bulk_insert as operation type. 
> Occasionally I hit below exception. 
> {code:java}
> 23/08/26 03:25:20 WARN org.apache.hudi.client.RunsTableService: Table 
> services are disabled. Set `Key: 'hoodie.table.services.enabled' , default: 
> true description: Master control to disable all table services including 
> archive, clean, compact, cluster, etc. since version: 0.11.0 deprecated 
> after: version is not defined)` to enable.
> 23/08/26 03:25:20 WARN org.apache.hudi.client.RunsTableService: Table 
> services are disabled. Set `Key: 'hoodie.table.services.enabled' , default: 
> true description: Master control to disable all table services including 
> archive, clean, compact, cluster, etc. since version: 0.11.0 deprecated 
> after: version is not defined)` to enable.
> 23/08/26 03:25:20 WARN org.apache.hudi.client.RunsTableService: Table 
> services are disabled. Set `Key: 'hoodie.table.services.enabled' , default: 
> true description: Master control to disable all table services including 
> archive, clean, compact, cluster, etc. since version: 0.11.0 deprecated 
> after: version is not defined)` to enable.
> 15636 ms to finish
> 1693020605035 Start writing cow table                                         
>   
> 23/08/26 03:30:05 ERROR 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
> e6d5c1fc-f537-463d-bb0f-66b45c240612, runId = 
> c75e1336-d815-4f3c-bdeb-a56a7b4d6100] terminated with error
> org.apache.hudi.exception.HoodieHeartbeatException: Unable to generate 
> heartbeat 
>         at 
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.updateHeartbeat(HoodieHeartbeatClient.java:270)
>         at 
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.start(HoodieHeartbeatClient.java:177)
>         at 
> org.apache.hudi.client.BaseHoodieWriteClient.startCommit(BaseHoodieWriteClient.java:1010)
>         at 
> org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:998)
>         at 
> org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:981)
>         at 
> org.apache.hudi.internal.DataSourceInternalWriterHelper.<init>(DataSourceInternalWriterHelper.java:66)
>         at 
> org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.<init>(HoodieDataSourceInternalBatchWrite.java:64)
>         at 
> org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWriteBuilder.buildForBatch(HoodieDataSourceInternalBatchWriteBuilder.java:61)
>         at 
> org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
>         at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
>         at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
>         at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
>         at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
>         at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
>         at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
>         at 
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>         at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
>         at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
>         at 
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
>         at 
> org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:607)
>         at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:185)
>         at 
> org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
>         at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
>         at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>         at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>         at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
>         at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
>         at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
>         at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
>         at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
>         at 
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>         at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
>         at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
>         at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
>         at 
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
>         at 
> $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$query$1(<console>:69)
>         at 
> $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$query$1$adapted(<console>:45)
>         at 
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Caused by: java.io.IOException: Upload failed for 
> 'ABCD_PATH/multi-writer-testing/tbl_bi/.hoodie/.heartbeat/20230826033005272'
>         at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(BaseAbstractGoogleAsyncWriteChannel.java:260)
>         at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.close(BaseAbstractGoogleAsyncWriteChannel.java:168)
>         at java.nio.channels.Channels$1.close(Channels.java:178)
>         at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>         at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:119)
>         at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>         at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>         at 
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.updateHeartbeat(HoodieHeartbeatClient.java:258)
>         ... 78 more
> Caused by: 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
>  412 Precondition Failed
> PUT 
> https://storage.googleapis.com/upload/storage/v1/b/ABCD_PATH/o?ifGenerationMatch=0&name=multi-writer-testing/tbl_bi/.hoodie/.heartbeat/20230826033005272&uploadType=resumable&upload_id=ADPycdvJgoESWSRlT-ZacrP1zxfT40GEz8FmOFh0zzyduDiHGVJbYHznxXSvD-B0aE_yioDNPrNnMDvQUCjqCD_tcDeiM3f3YwQJ
> {
>   "code" : 412,
>   "errors" : [ {
>     "domain" : "global",
>     "location" : "If-Match",
>     "locationType" : "header",
>     "message" : "At least one of the pre-conditions you specified did not 
> hold.",
>     "reason" : "conditionNotMet"
>   } ],
>   "message" : "At least one of the pre-conditions you specified did not hold."
> }
>         at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
>         at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
>         at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
>         at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:543)
>         at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
>         at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
>         at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:85)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:750)
> org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
> e6d5c1fc-f537-463d-bb0f-66b45c240612, runId = 
> c75e1336-d815-4f3c-bdeb-a56a7b4d6100] terminated with exception: Unable to 
> generate heartbeat
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:356)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Caused by: org.apache.hudi.exception.HoodieHeartbeatException: Unable to 
> generate heartbeat
>   at 
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.updateHeartbeat(HoodieHeartbeatClient.java:270)
>   at 
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.start(HoodieHeartbeatClient.java:177)
>   at 
> org.apache.hudi.client.BaseHoodieWriteClient.startCommit(BaseHoodieWriteClient.java:1010)
>   at 
> org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:998)
>   at 
> org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:981)
>   at 
> org.apache.hudi.internal.DataSourceInternalWriterHelper.<init>(DataSourceInternalWriterHelper.java:66)
>   at 
> org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.<init>(HoodieDataSourceInternalBatchWrite.java:64)
>   at 
> org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWriteBuilder.buildForBatch(HoodieDataSourceInternalBatchWriteBuilder.java:61)
>   at 
> org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
>   at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
>   at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
>   at 
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
>   at 
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
>   at 
> org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:607)
>   at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:185)
>   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
>   at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>   at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
>   at 
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
>   at $anonfun$query$1(<console>:69)
>   at $anonfun$query$1$adapted(<console>:45)
>   at 
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>   at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>   ... 1 more
> Caused by: java.io.IOException: Upload failed for 
> 'ABCD_PATH/multi-writer-testing/tbl_bi/.hoodie/.heartbeat/20230826033005272'
>   at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(BaseAbstractGoogleAsyncWriteChannel.java:260)
>   at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.close(BaseAbstractGoogleAsyncWriteChannel.java:168)
>   at java.nio.channels.Channels$1.close(Channels.java:178)
>   at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>   at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:119)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>   at 
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.updateHeartbeat(HoodieHeartbeatClient.java:258)
>   ... 78 more
> Caused by: 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
>  412 Precondition Failed
> PUT 
> https://storage.googleapis.com/upload/storage/v1/b/ABCD_PATH/o?ifGenerationMatch=0&name=multi-writer-testing/tbl_bi_mw/.hoodie/.heartbeat/20230826033005272&uploadType=resumable&upload_id=ADPycdvJgoESWSRlT-ZacrP1zxfT40GEz8FmOFh0zzyduDiHGVJbYHznxXSvD-B0aE_yioDNPrNnMDvQUCjqCD_tcDeiM3f3YwQJ
> {
>   "code" : 412,
>   "errors" : [ {
>     "domain" : "global",
>     "location" : "If-Match",
>     "locationType" : "header",
>     "message" : "At least one of the pre-conditions you specified did not 
> hold.",
>     "reason" : "conditionNotMet"
>   } ],
>   "message" : "At least one of the pre-conditions you specified did not hold."
> }
>   at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
>   at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
>   at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
>   at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:543)
>   at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
>   at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
>   at 
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:85)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:750) {code}
>  
> On restart, pipeline resumes w/o any issues. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to