[ 
https://issues.apache.org/jira/browse/SPARK-31931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adrian Jones updated SPARK-31931:
---------------------------------
    Description: 
Structured streaming checkpointing does not work with Google Cloud Storage when 
there are aggregations included in the streaming pipeline.

Using GCS as the external store works fine when there are no aggregations 
present in the pipeline (i.e. groupBy); however, once an aggregation is 
introduced, the attached error is thrown.

The error is only thrown when aggregating and pointing checkpointLocation to 
GCS. The exact code works fine when pointing checkpointLocation to HDFS.

Is it expected for GCS to function as a checkpoint location for aggregated 
pipelines? Are efforts currently in progress to enable this? Is it on a roadmap?

 

  was:
Structured streaming checkpointing does not work with Google Cloud Storage when 
there are aggregations included in the streaming pipeline.

Using GCS as the external store works fine when there are no aggregations 
present in the pipeline (i.e. groupBy); however, once an aggregation is 
introduced, the below error is thrown.

The error is only thrown when aggregating and pointing checkpointLocation to 
GCS. The exact code works fine when pointing checkpointLocation to HDFS.

Is it expected for GCS to function as a checkpoint location for aggregated 
pipelines? Are efforts currently in progress to enable this? Is it on a roadmap?

_org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
612a550b-992b-41cb-82f9-a95c12c51379, runId = 
90a8e64a-5f64-4bd0-90e7-5df14630c577] terminated with exception: Writing job 
aborted.```org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
612a550b-992b-41cb-82f9-a95c12c51379, runId = 
90a8e64a-5f64-4bd0-90e7-5df14630c577] terminated with exception: Writing job 
aborted.  at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:302)
  at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)Caused
 by: org.apache.spark.SparkException: Writing job aborted.  at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
  at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
  at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)  
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)  
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)  at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)  
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296) 
 at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3389)  at 
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2788)  at 
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)  at 
org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:540)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:536)
  at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
  at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
  at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:535)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
  at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
  at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
  at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
  at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
  ... 1 moreCaused by: org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost 
task 2.3 in stage 1.0 (TID 12, 
spark-structured-streaming-w-0.c.pso-wmt-sandbox.internal, executor 1): 
org.apache.spark.util.TaskCompletionListenerException: null at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138) at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117) 
at org.apache.spark.scheduler.Task.run(Task.scala:139) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)_
_Driver stacktrace:  at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1892)
  at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1880)
  at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1879)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)  
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)  
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)  at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879)  at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:927)
  at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:927)
  at scala.Option.foreach(Option.scala:407)  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2113)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2062)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2051)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)  at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)  at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)  at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
  ... 34 moreCaused by: org.apache.spark.util.TaskCompletionListenerException: 
null  at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)  at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)  
at org.apache.spark.scheduler.Task.run(Task.scala:139)  at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)  at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)_


> When using GCS as checkpoint location for Structured Streaming aggregation 
> pipeline, the Spark writing job is aborted
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-31931
>                 URL: https://issues.apache.org/jira/browse/SPARK-31931
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.5
>         Environment: GCP Dataproc 1.5 Debian 10 (Hadoop 2.10.0, Spark 2.4.5, 
> Cloud Storage Connector hadoop2.2.1.3, Scala 2.12.10)
>            Reporter: Adrian Jones
>            Priority: Blocker
>         Attachments: spark-structured-streaming-error
>
>
> Structured streaming checkpointing does not work with Google Cloud Storage 
> when there are aggregations included in the streaming pipeline.
> Using GCS as the external store works fine when there are no aggregations 
> present in the pipeline (i.e. groupBy); however, once an aggregation is 
> introduced, the attached error is thrown.
> The error is only thrown when aggregating and pointing checkpointLocation to 
> GCS. The exact code works fine when pointing checkpointLocation to HDFS.
> Is it expected for GCS to function as a checkpoint location for aggregated 
> pipelines? Are efforts currently in progress to enable this? Is it on a 
> roadmap?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to