[
https://issues.apache.org/jira/browse/SPARK-40700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Puneet Sharma updated SPARK-40700:
----------------------------------
Affects Version/s: 3.2.1
(was: 3.2.0)
> Spark S3 Checkpointing error after enabling RocksDb
> ---------------------------------------------------
>
> Key: SPARK-40700
> URL: https://issues.apache.org/jira/browse/SPARK-40700
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.2.1
> Environment: AWS ROSA OpenShift, AWS S3, RocksDB
> Reporter: Puneet Sharma
> Priority: Major
>
> We are running Spark Streaming state based application on OpenShift cluster.
> We are using Amazon S3 for checkpointing. Rocks DB has also been enabled
> using configuration - "spark.sql.streaming.stateStore.providerClass" =>
> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider".
> However, we are seeing intermittent errors in state checkpointing on S3. As a
> result, executors PODS go in error state.
>
>
> {{2022-09-30T07:41:12+0000 [ERROR] [RocksDBFileManager
> StateStoreId(opId=0,partId=15,name=default)]: Error zipping to
> s3a://event-analytics-engine-4-bucket/checkpoint_violationAggregation/state/0/15/4.zip
>
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/MANIFEST-000009
>
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/OPTIONS-000012
>
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/CURRENT
>
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/000010.log
>
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/metadata
> java.io.FileNotFoundException:
> /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/MANIFEST-000009
> (No such file or directory)
> at java.io.FileInputStream.open(FileInputStream.java:212) ~[?:1.8.0]
> at java.io.FileInputStream.<init>(FileInputStream.java:152) ~[?:1.8.0]
> at
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$zipToDfsFile$1(RocksDBFileManager.scala:442)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$zipToDfsFile$1$adapted(RocksDBFileManager.scala:440)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> ~[scala-library-2.12.15.jar:?]
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> ~[scala-library-2.12.15.jar:?]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> ~[scala-library-2.12.15.jar:?]
> at
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.zipToDfsFile(RocksDBFileManager.scala:440)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:172)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.state.RocksDB.$anonfun$commit$12(RocksDB.scala:265)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> ~[scala-library-2.12.15.jar:?]
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.state.RocksDB.timeTakenMs(RocksDB.scala:479)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.state.RocksDB.commit(RocksDB.scala:265)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.commit(RocksDBStateStoreProvider.scala:93)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$processDataWithPartition$5(FlatMapGroupsWithStateExec.scala:190)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> ~[scala-library-2.12.15.jar:?]
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:142)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:142)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.timeTakenMs(FlatMapGroupsWithStateExec.scala:53)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$processDataWithPartition$4(FlatMapGroupsWithStateExec.scala:190)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> ~[scala-library-2.12.15.jar:?]
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
> ~[scala-library-2.12.15.jar:?]
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> ~[scala-library-2.12.15.jar:?]
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> ~[scala-library-2.12.15.jar:?]
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> ~[scala-library-2.12.15.jar:?]
> at
> org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> ~[scala-library-2.12.15.jar:?]
> at
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
> ~[spark-sql_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> ~[spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
> [spark-core_2.12-3.2.1.jar:3.2.1]
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> [spark-core_2.12-3.2.1.jar:3.2.1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160)
> [?:1.8.0]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> [?:1.8.0]
> at java.lang.Thread.run(Thread.java:830) [?:2.9 (05-11-2022)]
> 2022-09-30T07:41:12+0000 [WARN] [BlockManager]: Putting block rdd_1342_15
> failed due to exception java.lang.NullPointerException.
> 2022-09-30T07:41:12+0000 [WARN] [BlockManager]: Block rdd_1342_15 could not
> be removed as it was not found on disk or in memory}}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]