[
https://issues.apache.org/jira/browse/SPARK-31931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Adrian Jones updated SPARK-31931:
---------------------------------
Description:
Structured streaming checkpointing does not work with Google Cloud Storage when
there are aggregations included in the streaming pipeline.
Using GCS as the external store works fine when there are no aggregations
present in the pipeline (i.e. groupBy); however, once an aggregation is
introduced, the attached error is thrown.
The error is only thrown when aggregating and pointing checkpointLocation to
GCS. The exact code works fine when pointing checkpointLocation to HDFS.
Is it expected for GCS to function as a checkpoint location for aggregated
pipelines? Are efforts currently in progress to enable this? Is it on a roadmap?
was:
Structured streaming checkpointing does not work with Google Cloud Storage when
there are aggregations included in the streaming pipeline.
Using GCS as the external store works fine when there are no aggregations
present in the pipeline (i.e. groupBy); however, once an aggregation is
introduced, the below error is thrown.
The error is only thrown when aggregating and pointing checkpointLocation to
GCS. The exact code works fine when pointing checkpointLocation to HDFS.
Is it expected for GCS to function as a checkpoint location for aggregated
pipelines? Are efforts currently in progress to enable this? Is it on a roadmap?
_org.apache.spark.sql.streaming.StreamingQueryException: Query [id =
612a550b-992b-41cb-82f9-a95c12c51379, runId =
90a8e64a-5f64-4bd0-90e7-5df14630c577] terminated with exception: Writing job
aborted.```org.apache.spark.sql.streaming.StreamingQueryException: Query [id =
612a550b-992b-41cb-82f9-a95c12c51379, runId =
90a8e64a-5f64-4bd0-90e7-5df14630c577] terminated with exception: Writing job
aborted. at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:302)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)Caused
by: org.apache.spark.SparkException: Writing job aborted. at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3389) at
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2788) at
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370) at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370) at
org.apache.spark.sql.Dataset.collect(Dataset.scala:2788) at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:540)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:536)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:535)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
... 1 moreCaused by: org.apache.spark.SparkException: Job aborted due to
stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost
task 2.3 in stage 1.0 (TID 12,
spark-structured-streaming-w-0.c.pso-wmt-sandbox.internal, executor 1):
org.apache.spark.util.TaskCompletionListenerException: null at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138) at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)
at org.apache.spark.scheduler.Task.run(Task.scala:139) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)_
_Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1892)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1880)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1879)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879) at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:927)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:407) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2113)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2062)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2051)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
... 34 moreCaused by: org.apache.spark.util.TaskCompletionListenerException:
null at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138) at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)
at org.apache.spark.scheduler.Task.run(Task.scala:139) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)_
> When using GCS as checkpoint location for Structured Streaming aggregation
> pipeline, the Spark writing job is aborted
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-31931
> URL: https://issues.apache.org/jira/browse/SPARK-31931
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.5
> Environment: GCP Dataproc 1.5 Debian 10 (Hadoop 2.10.0, Spark 2.4.5,
> Cloud Storage Connector hadoop2.2.1.3, Scala 2.12.10)
> Reporter: Adrian Jones
> Priority: Blocker
> Attachments: spark-structured-streaming-error
>
>
> Structured streaming checkpointing does not work with Google Cloud Storage
> when there are aggregations included in the streaming pipeline.
> Using GCS as the external store works fine when there are no aggregations
> present in the pipeline (i.e. groupBy); however, once an aggregation is
> introduced, the attached error is thrown.
> The error is only thrown when aggregating and pointing checkpointLocation to
> GCS. The exact code works fine when pointing checkpointLocation to HDFS.
> Is it expected for GCS to function as a checkpoint location for aggregated
> pipelines? Are efforts currently in progress to enable this? Is it on a
> roadmap?
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]