[ 
https://issues.apache.org/jira/browse/FLINK-23388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17421421#comment-17421421
 ] 

Toby Miller commented on FLINK-23388:
-------------------------------------

[~MartijnVisser] Yes I saw - thanks for flagging. This ticket will still be 
relevant to Zeppelin and possibly other use-cases, so I hope it's okay to 
proceed.

[~twalthr] I will try that when I next have time. Thanks.

> Non-static Scala case classes cannot be serialised
> --------------------------------------------------
>
>                 Key: FLINK-23388
>                 URL: https://issues.apache.org/jira/browse/FLINK-23388
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>            Reporter: Toby Miller
>            Priority: Major
>
> h3. Explanation of the issue
> {{ScalaCaseClassSerializer}} is not powerful enough to serialise all Scala 
> case classes that can be serialised in normal JVM serialisation (and the 
> standard {{KryoSerializer}}). This is because it treats all case classes as 
> made up only of their listed members. In fact, it is valid Scala to have 
> other data in a case class, and in particular, it is possible for a nested 
> case class to depend on data in its parent. This might be ill advised, but it 
> is valid Scala:
> {code:scala}
> class Outer {
>     var x = 0
>     case class Inner(y: Int) {
>         def z = x
>     }
> }
> val outer = new Outer()
> val inner = outer.Inner(1)
> outer.x = 2
> scala> inner.z
> res0: Int = 2
> {code}
> As of Scala 2.11, the compiler flag {{-Yrepl-class-based}} is made available 
> (and defaults to on in Scala 2.13). It changes the way the Scala REPL and 
> similar tools encapsulates the user code written in the REPL, wrapping it in 
> a serialisable class rather than an object (static class). The purpose of 
> this is to aid serialisation of the whole thing for distributed REPL systems 
> that need to perform computation on remote nodes. One such application is 
> {{flink-scala-shell}}, and another is Apache Zeppelin. See below for an 
> illustration of the flag's importance in applications like these.
> In the JVM, a class can be serialised if it implements {{Serializable}}, and 
> either it is static or its parent can be serialised. In the latter case the 
> parent is brought with it to allow it to be constructed in its context at the 
> other end.
> The Flink {{ScalaCaseClassSerializer}} does not understand that case classes 
> (which always implement {{Serializable}}) might be nested inside a 
> serialisable outer class. This is exactly the scenario that occurs when 
> defining a supposedly top-level case class in {{flink-scala-shell}} or 
> Zeppelin, because {{-Yrepl-class-based}} causes it to be nested inside a 
> serialisable outer class. The consequence is that Scala case classes defined 
> in one of these applications cannot be serialised in Flink, making them 
> practically unusable. As a core Scala language feature, this is a serious 
> omission from these projects.
> h3. Fixing {{ScalaCaseClassSerializer}} - no free lunch
> I attempted to fix the issue by redirecting case classes in Flink to the 
> standard {{KryoSerializer}} rather than the {{ScalaCaseClassSerializer}}, and 
> at first glance this appeared to work very well. I was even able to run code 
> in Zeppelin that sent a user-defined case class to be processed in a Flink 
> job using the batch environment, and it worked well.
> Unfortunately, it didn't work when I tried to do the same thing using the 
> streaming table environment. The error as presented was a failure to cast 
> {{KryoSerializer}} to {{TupleSerializerBase}}, the superclass serialiser for 
> tuples, products, and {{ScalaCaseClassSerializer}}. The Flink Table API 
> assumes that case classes will be assigned a serialiser capable of moving to 
> and fro from a table representation. This is a strong assumption that no case 
> class instance will ever carry data besides its primary contents.
> In the case we're most interested in (the REPL class wrapper one), most case 
> classes will not actually depend on additional data, but it's difficult for 
> Flink to know that. In general, I imagine we would like to support cases 
> where the case class does depend on additional data, although clearly Flink 
> would be unable to turn it into a record and then back again. Perhaps we 
> could do all the other operations without issue, but then raise an error if 
> any operation attempted the latter transformation if there were missing 
> additional data?
> h3. Illustration of the importance of {{-Yrepl-class-based}}
> As illustration of why the flag is important, consider the following REPL 
> interaction:
> {code:scala}
> val x = 1
> case class A(y: Int) {
>     def z = x
> }
> val a = A(2)
> compute_remotely(() => a.z)
> {code}
> The instance {{a}} requires the context {{x}} to be bought with it in order 
> to perform the computation {{a.z}}, but if the REPL is placing the user code 
> statically, then a simple serialisation of {{a}} will not take {{x}} with it.
> However, with the {{-Yrepl-class-based}} flag active, {{A}} is now a nested 
> class which depends on the outer serialisable class of global state. {{x}} is 
> now automatically transferred as part of JVM serialisation and 
> deserialisation, and the REPL doesn't need to jump through any additional 
> hoops to make this work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to