Puneet Sharma created SPARK-40700:
-------------------------------------

             Summary: Spark S3 Checkpointing error after enabling RocksDb
                 Key: SPARK-40700
                 URL: https://issues.apache.org/jira/browse/SPARK-40700
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.2.0
         Environment: AWS ROSA OpenShift, AWS S3, RocksDB
            Reporter: Puneet Sharma


We are running Spark Streaming state based application on OpenShift cluster. We 
are using Amazon S3 for checkpointing. Rocks DB has also been enabled using 
configuration - "spark.sql.streaming.stateStore.providerClass" => 
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider".

However, we are seeing intermittent errors in state checkpointing on S3. As a 
result, executors PODS go in error state. 

 

 
{{2022-09-30T07:41:12+0000 [ERROR] [RocksDBFileManager 
StateStoreId(opId=0,partId=15,name=default)]: Error zipping to 
s3a://event-analytics-engine-4-bucket/checkpoint_violationAggregation/state/0/15/4.zip
    
/var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/MANIFEST-000009
    
/var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/OPTIONS-000012
    
/var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/CURRENT
    
/var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/000010.log
    
/var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/metadata
java.io.FileNotFoundException: 
/var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/MANIFEST-000009
 (No such file or directory)
    at java.io.FileInputStream.open(FileInputStream.java:212) ~[?:1.8.0]
    at java.io.FileInputStream.<init>(FileInputStream.java:152) ~[?:1.8.0]
    at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$zipToDfsFile$1(RocksDBFileManager.scala:442)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$zipToDfsFile$1$adapted(RocksDBFileManager.scala:440)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.15.jar:?]
    at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.15.jar:?]
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.15.jar:?]
    at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.zipToDfsFile(RocksDBFileManager.scala:440)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:172)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.state.RocksDB.$anonfun$commit$12(RocksDB.scala:265)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.state.RocksDB.timeTakenMs(RocksDB.scala:479)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.state.RocksDB.commit(RocksDB.scala:265)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.commit(RocksDBStateStoreProvider.scala:93)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$processDataWithPartition$5(FlatMapGroupsWithStateExec.scala:190)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:142)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:142)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.timeTakenMs(FlatMapGroupsWithStateExec.scala:53)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$processDataWithPartition$4(FlatMapGroupsWithStateExec.scala:190)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
 ~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
~[scala-library-2.12.15.jar:?]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) 
~[scala-library-2.12.15.jar:?]
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
~[scala-library-2.12.15.jar:?]
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
~[scala-library-2.12.15.jar:?]
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
~[scala-library-2.12.15.jar:?]
    at 
org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
~[scala-library-2.12.15.jar:?]
    at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
 ~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
 ~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
 ~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
 ~[spark-sql_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.scheduler.Task.run(Task.scala:131) 
~[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
 ~[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) 
[spark-core_2.12-3.2.1.jar:3.2.1]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
[spark-core_2.12-3.2.1.jar:3.2.1]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160) 
[?:1.8.0]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
[?:1.8.0]
    at java.lang.Thread.run(Thread.java:830) [?:2.9 (05-11-2022)]
2022-09-30T07:41:12+0000 [WARN] [BlockManager]: Putting block rdd_1342_15 
failed due to exception java.lang.NullPointerException.
2022-09-30T07:41:12+0000 [WARN] [BlockManager]: Block rdd_1342_15 could not be 
removed as it was not found on disk or in memory}}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to