[ 
https://issues.apache.org/jira/browse/SPARK-22305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221982#comment-16221982
 ] 

Yuval Itzchakov commented on SPARK-22305:
-----------------------------------------

[~zsxwing] A typical query takes ~ 0.5 seconds to execute, so about 120 
batches. 

I discovered something else which relates to the StackOverflow. It seems the 
way that `HDFSBackedStateStoreProvider` is implemented is that there is a 
correlation between the file name of the latest offset and the state store 
version Spark looks for.

But assume the following situation:

1. The state was modified in such a way that it is not backwards compatible 
with the state in the store
2. Due to 1, we delete the state in the store but keep the offset files
3. Spark starts, trying to recover the previous state, and takes the latest 
state version from the latest offset file
4. The state store recursively starts searching for the version of the state 
(which no longer exists), ending on a StackOverflowException.

Would it be possible to separate the two (state and offsets) such that it would 
be possible to automatically start with the latest stored offsets but without 
the state?

> HDFSBackedStateStoreProvider fails with StackOverflowException when 
> attempting to recover state
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22305
>                 URL: https://issues.apache.org/jira/browse/SPARK-22305
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Yuval Itzchakov
>
> Environment:
> Spark: 2.2.0
> Java version: 1.8.0_112
> spark.sql.streaming.minBatchesToRetain: 100
> After an application failure due to OOM exceptions, restarting the 
> application with the existing state produces the following OOM:
> {code:java}
> java.io.IOException: com.google.protobuf.ServiceException: 
> java.lang.StackOverflowError
>       at 
> org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:260)
>       at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
>       at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
>       at 
> org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1215)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:303)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:269)
>       at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:261)
>       at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1540)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>       at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$readSnapshotFile(HDFSBackedStateStoreProvider.scala:405)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:296)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:295)
>       at scala.Option.getOrElse(Option.scala:121)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:295)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:297)
>       at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$4.apply(HDFSBackedStateStoreProvider.scala:296)
>       at scala.Option.getOrElse(Option.scala:121)
> {code}
> There were 2 snapshot files in the state directory and 230 delta files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to