[ 
https://issues.apache.org/jira/browse/FLINK-23388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17383342#comment-17383342
 ] 

Timo Walther edited comment on FLINK-23388 at 7/19/21, 1:33 PM:
----------------------------------------------------------------

Thanks for opening this issue and the deep investigation. We should try to 
split the problem into separate issues. The way how the Scala API deals with 
types (i.e. ScalaCaseClassSerializer and others) was causing a lot of problems 
in the past. This is also why we tried a different approach in Table API that 
handles Java and Scala classes in the same way based on reflection/byte code 
information.

Actually, the Flink community would like to avoid JVM serialization if 
possible. This is esp. important when state is involved and we want to be in 
control of the savepoint format.

It would be great if you can share more "reflective" information with us. How 
does the Scala class hierarchy for {{A}} in your example looks like (fields, 
methods, signatures)? Basically how would a Java class lead to the same byte 
code as the Scala REPL class. Why is Flink not able to serialize the outer 
class that has been added implicitly? Is it because it is not a valid POJO?


was (Author: twalthr):
Thanks for opening this issue and the deep investigation. We should try to 
split the problem into separate issues. The way how the Scala API deals with 
types (i.e. ScalaCaseClassSerializer and others) was causing a lot of problems 
in the past. This is also why we tried a different approach in Table API that 
handles Java and Scala classes in the same way based on reflection/byte code 
information.

Actually, the Flink community would like to avoid JVM serialization if 
possible. This is esp. important when state is involved and we want to be in 
control of the savepoint format.

It would be great if you can share more "reflective" information with us. How 
does the Scala class hierarchy for {{A}} in your example looks like (fields, 
methods, signatures)? Why is Flink not able to serialize the outer class that 
has been added implicitly? Is it because it is not a valid POJO?

> Non-static Scala case classes cannot be serialised
> --------------------------------------------------
>
>                 Key: FLINK-23388
>                 URL: https://issues.apache.org/jira/browse/FLINK-23388
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System, Scala Shell
>            Reporter: Toby Miller
>            Priority: Major
>
> h3. Explanation of the issue
> {{ScalaCaseClassSerializer}} is not powerful enough to serialise all Scala 
> case classes that can be serialised in normal JVM serialisation (and the 
> standard {{KryoSerializer}}). This is because it treats all case classes as 
> made up only of their listed members. In fact, it is valid Scala to have 
> other data in a case class, and in particular, it is possible for a nested 
> case class to depend on data in its parent. This might be ill advised, but it 
> is valid Scala:
> {code:scala}
> class Outer {
>     var x = 0
>     case class Inner(y: Int) {
>         def z = x
>     }
> }
> val outer = new Outer()
> val inner = outer.Inner(1)
> outer.x = 2
> scala> inner.z
> res0: Int = 2
> {code}
> As of Scala 2.11, the compiler flag {{-Yrepl-class-based}} is made available 
> (and defaults to on in Scala 2.13). It changes the way the Scala REPL and 
> similar tools encapsulates the user code written in the REPL, wrapping it in 
> a serialisable class rather than an object (static class). The purpose of 
> this is to aid serialisation of the whole thing for distributed REPL systems 
> that need to perform computation on remote nodes. One such application is 
> {{flink-scala-shell}}, and another is Apache Zeppelin. See below for an 
> illustration of the flag's importance in applications like these.
> In the JVM, a class can be serialised if it implements {{Serializable}}, and 
> either it is static or its parent can be serialised. In the latter case the 
> parent is brought with it to allow it to be constructed in its context at the 
> other end.
> The Flink {{ScalaCaseClassSerializer}} does not understand that case classes 
> (which always implement {{Serializable}}) might be nested inside a 
> serialisable outer class. This is exactly the scenario that occurs when 
> defining a supposedly top-level case class in {{flink-scala-shell}} or 
> Zeppelin, because {{-Yrepl-class-based}} causes it to be nested inside a 
> serialisable outer class. The consequence is that Scala case classes defined 
> in one of these applications cannot be serialised in Flink, making them 
> practically unusable. As a core Scala language feature, this is a serious 
> omission from these projects.
> h3. Fixing {{ScalaCaseClassSerializer}} - no free lunch
> I attempted to fix the issue by redirecting case classes in Flink to the 
> standard {{KryoSerializer}} rather than the {{ScalaCaseClassSerializer}}, and 
> at first glance this appeared to work very well. I was even able to run code 
> in Zeppelin that sent a user-defined case class to be processed in a Flink 
> job using the batch environment, and it worked well.
> Unfortunately, it didn't work when I tried to do the same thing using the 
> streaming table environment. The error as presented was a failure to cast 
> {{KryoSerializer}} to {{TupleSerializerBase}}, the superclass serialiser for 
> tuples, products, and {{ScalaCaseClassSerializer}}. The Flink Table API 
> assumes that case classes will be assigned a serialiser capable of moving to 
> and fro from a table representation. This is a strong assumption that no case 
> class instance will ever carry data besides its primary contents.
> In the case we're most interested in (the REPL class wrapper one), most case 
> classes will not actually depend on additional data, but it's difficult for 
> Flink to know that. In general, I imagine we would like to support cases 
> where the case class does depend on additional data, although clearly Flink 
> would be unable to turn it into a record and then back again. Perhaps we 
> could do all the other operations without issue, but then raise an error if 
> any operation attempted the latter transformation if there were missing 
> additional data?
> h3. Illustration of the importance of {{-Yrepl-class-based}}
> As illustration of why the flag is important, consider the following REPL 
> interaction:
> {code:scala}
> val x = 1
> case class A(y: Int) {
>     def z = x
> }
> val a = A(2)
> compute_remotely(() => a.z)
> {code}
> The instance {{a}} requires the context {{x}} to be bought with it in order 
> to perform the computation {{a.z}}, but if the REPL is placing the user code 
> statically, then a simple serialisation of {{a}} will not take {{x}} with it.
> However, with the {{-Yrepl-class-based}} flag active, {{A}} is now a nested 
> class which depends on the outer serialisable class of global state. {{x}} is 
> now automatically transferred as part of JVM serialisation and 
> deserialisation, and the REPL doesn't need to jump through any additional 
> hoops to make this work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to