[
https://issues.apache.org/jira/browse/HUDI-6814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ethan Guo updated HUDI-6814:
----------------------------
Component/s: table-service
> Streaming ingestion using bulk_insert in GCP intermittantly fails w/ 412
> pre-condition error
> --------------------------------------------------------------------------------------------
>
> Key: HUDI-6814
> URL: https://issues.apache.org/jira/browse/HUDI-6814
> Project: Apache Hudi
> Issue Type: Bug
> Components: table-service, writer-core
> Reporter: sivabalan narayanan
> Priority: Major
>
> I have a streaming ingestion (consuming from streaming source and writing to
> hudi using forEachBatch) using bulk_insert as operation type.
> Occasionally I hit below exception.
> {code:java}
> 23/08/26 03:25:20 WARN org.apache.hudi.client.RunsTableService: Table
> services are disabled. Set `Key: 'hoodie.table.services.enabled' , default:
> true description: Master control to disable all table services including
> archive, clean, compact, cluster, etc. since version: 0.11.0 deprecated
> after: version is not defined)` to enable.
> 23/08/26 03:25:20 WARN org.apache.hudi.client.RunsTableService: Table
> services are disabled. Set `Key: 'hoodie.table.services.enabled' , default:
> true description: Master control to disable all table services including
> archive, clean, compact, cluster, etc. since version: 0.11.0 deprecated
> after: version is not defined)` to enable.
> 23/08/26 03:25:20 WARN org.apache.hudi.client.RunsTableService: Table
> services are disabled. Set `Key: 'hoodie.table.services.enabled' , default:
> true description: Master control to disable all table services including
> archive, clean, compact, cluster, etc. since version: 0.11.0 deprecated
> after: version is not defined)` to enable.
> 15636 ms to finish
> 1693020605035 Start writing cow table
>
> 23/08/26 03:30:05 ERROR
> org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id =
> e6d5c1fc-f537-463d-bb0f-66b45c240612, runId =
> c75e1336-d815-4f3c-bdeb-a56a7b4d6100] terminated with error
> org.apache.hudi.exception.HoodieHeartbeatException: Unable to generate
> heartbeat
> at
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.updateHeartbeat(HoodieHeartbeatClient.java:270)
> at
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.start(HoodieHeartbeatClient.java:177)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.startCommit(BaseHoodieWriteClient.java:1010)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:998)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:981)
> at
> org.apache.hudi.internal.DataSourceInternalWriterHelper.<init>(DataSourceInternalWriterHelper.java:66)
> at
> org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.<init>(HoodieDataSourceInternalBatchWrite.java:64)
> at
> org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWriteBuilder.buildForBatch(HoodieDataSourceInternalBatchWriteBuilder.java:61)
> at
> org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
> at
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
> at
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
> at
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
> at
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:607)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:185)
> at
> org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
> at
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
> at
> $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$query$1(<console>:69)
> at
> $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$query$1$adapted(<console>:45)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Caused by: java.io.IOException: Upload failed for
> 'ABCD_PATH/multi-writer-testing/tbl_bi/.hoodie/.heartbeat/20230826033005272'
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(BaseAbstractGoogleAsyncWriteChannel.java:260)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.close(BaseAbstractGoogleAsyncWriteChannel.java:168)
> at java.nio.channels.Channels$1.close(Channels.java:178)
> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:119)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
> at
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.updateHeartbeat(HoodieHeartbeatClient.java:258)
> ... 78 more
> Caused by:
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
> 412 Precondition Failed
> PUT
> https://storage.googleapis.com/upload/storage/v1/b/ABCD_PATH/o?ifGenerationMatch=0&name=multi-writer-testing/tbl_bi/.hoodie/.heartbeat/20230826033005272&uploadType=resumable&upload_id=ADPycdvJgoESWSRlT-ZacrP1zxfT40GEz8FmOFh0zzyduDiHGVJbYHznxXSvD-B0aE_yioDNPrNnMDvQUCjqCD_tcDeiM3f3YwQJ
> {
> "code" : 412,
> "errors" : [ {
> "domain" : "global",
> "location" : "If-Match",
> "locationType" : "header",
> "message" : "At least one of the pre-conditions you specified did not
> hold.",
> "reason" : "conditionNotMet"
> } ],
> "message" : "At least one of the pre-conditions you specified did not hold."
> }
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:543)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:85)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> org.apache.spark.sql.streaming.StreamingQueryException: Query [id =
> e6d5c1fc-f537-463d-bb0f-66b45c240612, runId =
> c75e1336-d815-4f3c-bdeb-a56a7b4d6100] terminated with exception: Unable to
> generate heartbeat
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:356)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Caused by: org.apache.hudi.exception.HoodieHeartbeatException: Unable to
> generate heartbeat
> at
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.updateHeartbeat(HoodieHeartbeatClient.java:270)
> at
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.start(HoodieHeartbeatClient.java:177)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.startCommit(BaseHoodieWriteClient.java:1010)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:998)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:981)
> at
> org.apache.hudi.internal.DataSourceInternalWriterHelper.<init>(DataSourceInternalWriterHelper.java:66)
> at
> org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.<init>(HoodieDataSourceInternalBatchWrite.java:64)
> at
> org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWriteBuilder.buildForBatch(HoodieDataSourceInternalBatchWriteBuilder.java:61)
> at
> org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
> at
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
> at
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
> at
> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
> at
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:607)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:185)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
> at
> org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
> at $anonfun$query$1(<console>:69)
> at $anonfun$query$1$adapted(<console>:45)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
> ... 1 more
> Caused by: java.io.IOException: Upload failed for
> 'ABCD_PATH/multi-writer-testing/tbl_bi/.hoodie/.heartbeat/20230826033005272'
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(BaseAbstractGoogleAsyncWriteChannel.java:260)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.close(BaseAbstractGoogleAsyncWriteChannel.java:168)
> at java.nio.channels.Channels$1.close(Channels.java:178)
> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
> at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:119)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
> at
> org.apache.hudi.client.heartbeat.HoodieHeartbeatClient.updateHeartbeat(HoodieHeartbeatClient.java:258)
> ... 78 more
> Caused by:
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
> 412 Precondition Failed
> PUT
> https://storage.googleapis.com/upload/storage/v1/b/ABCD_PATH/o?ifGenerationMatch=0&name=multi-writer-testing/tbl_bi_mw/.hoodie/.heartbeat/20230826033005272&uploadType=resumable&upload_id=ADPycdvJgoESWSRlT-ZacrP1zxfT40GEz8FmOFh0zzyduDiHGVJbYHznxXSvD-B0aE_yioDNPrNnMDvQUCjqCD_tcDeiM3f3YwQJ
> {
> "code" : 412,
> "errors" : [ {
> "domain" : "global",
> "location" : "If-Match",
> "locationType" : "header",
> "message" : "At least one of the pre-conditions you specified did not
> hold.",
> "reason" : "conditionNotMet"
> } ],
> "message" : "At least one of the pre-conditions you specified did not hold."
> }
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:543)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:466)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:576)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:85)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750) {code}
>
> On restart, pipeline resumes w/o any issues.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)