[ 
https://issues.apache.org/jira/browse/SPARK-40700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Puneet Sharma updated SPARK-40700:
----------------------------------
    Affects Version/s: 3.2.1
                           (was: 3.2.0)

> Spark S3 Checkpointing error after enabling RocksDb
> ---------------------------------------------------
>
>                 Key: SPARK-40700
>                 URL: https://issues.apache.org/jira/browse/SPARK-40700
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.2.1
>         Environment: AWS ROSA OpenShift, AWS S3, RocksDB
>            Reporter: Puneet Sharma
>            Priority: Major
>
> We are running Spark Streaming state based application on OpenShift cluster. 
> We are using Amazon S3 for checkpointing. Rocks DB has also been enabled 
> using configuration - "spark.sql.streaming.stateStore.providerClass" => 
> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider".
> However, we are seeing intermittent errors in state checkpointing on S3. As a 
> result, executors PODS go in error state. 
>  
>  
> {{2022-09-30T07:41:12+0000 [ERROR] [RocksDBFileManager 
> StateStoreId(opId=0,partId=15,name=default)]: Error zipping to 
> s3a://event-analytics-engine-4-bucket/checkpoint_violationAggregation/state/0/15/4.zip
>     
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/MANIFEST-000009
>     
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/OPTIONS-000012
>     
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/CURRENT
>     
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/000010.log
>     
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/metadata
> java.io.FileNotFoundException: 
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/MANIFEST-000009
>  (No such file or directory)
>     at java.io.FileInputStream.open(FileInputStream.java:212) ~[?:1.8.0]
>     at java.io.FileInputStream.<init>(FileInputStream.java:152) ~[?:1.8.0]
>     at 
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$zipToDfsFile$1(RocksDBFileManager.scala:442)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$zipToDfsFile$1$adapted(RocksDBFileManager.scala:440)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.15.jar:?]
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.15.jar:?]
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.15.jar:?]
>     at 
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.zipToDfsFile(RocksDBFileManager.scala:440)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:172)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.state.RocksDB.$anonfun$commit$12(RocksDB.scala:265)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
> ~[scala-library-2.12.15.jar:?]
>     at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.state.RocksDB.timeTakenMs(RocksDB.scala:479)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.state.RocksDB.commit(RocksDB.scala:265)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.commit(RocksDBStateStoreProvider.scala:93)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$processDataWithPartition$5(FlatMapGroupsWithStateExec.scala:190)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
> ~[scala-library-2.12.15.jar:?]
>     at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:142)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:142)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.timeTakenMs(FlatMapGroupsWithStateExec.scala:53)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$processDataWithPartition$4(FlatMapGroupsWithStateExec.scala:190)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
>  ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> ~[scala-library-2.12.15.jar:?]
>     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) 
> ~[scala-library-2.12.15.jar:?]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> ~[scala-library-2.12.15.jar:?]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> ~[scala-library-2.12.15.jar:?]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> ~[scala-library-2.12.15.jar:?]
>     at 
> org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> ~[scala-library-2.12.15.jar:?]
>     at 
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
>  ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
>  ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
>  ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
>  ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
>  ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
>  ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.scheduler.Task.run(Task.scala:131) 
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>  ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) 
> [spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
> [spark-core_2.12-3.2.1.jar:3.2.1]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
>  [?:1.8.0]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  [?:1.8.0]
>     at java.lang.Thread.run(Thread.java:830) [?:2.9 (05-11-2022)]
> 2022-09-30T07:41:12+0000 [WARN] [BlockManager]: Putting block rdd_1342_15 
> failed due to exception java.lang.NullPointerException.
> 2022-09-30T07:41:12+0000 [WARN] [BlockManager]: Block rdd_1342_15 could not 
> be removed as it was not found on disk or in memory}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to