[ 
https://issues.apache.org/jira/browse/SPARK-17372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-17372.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 2.1.0
                   2.0.1

Issue resolved by pull request 14987
[https://github.com/apache/spark/pull/14987]

> Running a file stream on a directory with partitioned subdirs throw 
> NotSerializableException/StackOverflowError
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-17372
>                 URL: https://issues.apache.org/jira/browse/SPARK-17372
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL, Streaming
>    Affects Versions: 2.0.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>             Fix For: 2.0.1, 2.1.0
>
>
> When we create a filestream on a directory that has partitioned subdirs (i.e. 
> dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir as 
> Seq[String] which internally is a Stream[String]. This is because of this 
> [line|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93],
>  where a LinkedHashSet.values.toSeq returns Stream. Then when the 
> [FileStreamSource|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79]
>  filters this Stream[String] to remove the seen files, it creates a new 
> Stream[String], which has a filter function that has a $outer reference to 
> the FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String] 
> causes NotSerializableException. This will happened even if there is just one 
> file in the dir.
> Its important to note that this behavior is different in Scala 2.11. There is 
> no $outer reference to FileStreamSource, so it does not throw 
> NotSerializableException. However, with a large sequence of files (tested 
> with 10000 files), it throws StackOverflowError. This is because how Stream 
> class is implemented. Its basically like a linked list, and attempting to 
> serialize a long Stream requires *recursively* going through linked list, 
> thus resulting in StackOverflowError.
> In short, across both Scala 2.10 and 2.11, serialization fails when both the 
> following conditions are true. 
> - file stream defined on a partitioned directory  
> - directory has 10k+ files
> The right solution is to convert the seq to an array before writing to the 
> log.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to