[ 
https://issues.apache.org/jira/browse/FLINK-9654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zsolt Donca updated FLINK-9654:
-------------------------------
    Description: 
When you are using custom `TypeSerializer` instances implemented in Scala, the 
Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can manifest 
itself when a Flink job is restored from checkpoint or started with a savepoint.

The reason is that in such a restore from checkpoint or savepoint, Flink uses 
`InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type 
serializers and their configurations. The deserialization walks through the 
entire object graph corresponding, and for each class it calls 
`isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place 
for FLINK-6869). If there is an internal class defined in a Scala object for 
which `getSimpleName` fails (see the Scala issue), then a 
`java.lang.InternalError` is thrown which causes the task manager to restart. 
In this case, Flink tries to restart the job on another task manager, causing 
all the task managers to restart, wreaking havoc on the entire Flink cluster.

There are some alternative type information derivation mechanisms that rely on 
anonymous classes and, most importantly, classes generated by macros, that can 
easily trigger the above problem. I am personally working on 
[https://github.com/zsolt-donca/flink-alt], and there is also 
[https://github.com/joroKr21/flink-shapeless]

I prepared a pull request that fixes the issue. 

  was:
When you are using custom `TypeSerializer` instances implemented in Scala, the 
Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can manifest 
itself when a Flink job is restored from checkpoint or started with a savepoint.

The reason is that in such a restore from checkpoint or savepoint, Flink uses 
`InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type 
serializers and their configurations. The deserialization walks through the 
entire object graph corresponding, and for each class it calls 
`isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place 
for FLINK-6869). If there is an internal class defined in a Scala object for 
which `getSimpleName` fails (see the Scala issue), then a 
`java.lang.InternalError` is thrown which causes the task manager to restart. 
In this case, Flink tries to restart the job on another task manager, causing 
all the task managers to restart, wreaking havoc on the entire Flink cluster.

There are some alternative type information derivation mechanisms that rely on 
anonymous classes and, most importantly, classes generated by macros, that can 
easily trigger the above problem. I am personally working on 
[flink-alt|[https://github.com/zsolt-donca/flink-alt]], and there is also 
[flink-shapeless|[https://github.com/joroKr21/flink-shapeless].]

I prepared a pull request that fixes the issue. 


> Internal error while deserializing custom Scala TypeSerializer instances
> ------------------------------------------------------------------------
>
>                 Key: FLINK-9654
>                 URL: https://issues.apache.org/jira/browse/FLINK-9654
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Zsolt Donca
>            Priority: Major
>
> When you are using custom `TypeSerializer` instances implemented in Scala, 
> the Scala issue [SI-2034|https://issues.scala-lang.org/browse/SI-2034] can 
> manifest itself when a Flink job is restored from checkpoint or started with 
> a savepoint.
> The reason is that in such a restore from checkpoint or savepoint, Flink uses 
> `InstantiationUtil.FailureTolerantObjectInputStream` to deserialize the type 
> serializers and their configurations. The deserialization walks through the 
> entire object graph corresponding, and for each class it calls 
> `isAnonymousClass`, which, in turn, calls `getSimpleName` (mechanism in place 
> for FLINK-6869). If there is an internal class defined in a Scala object for 
> which `getSimpleName` fails (see the Scala issue), then a 
> `java.lang.InternalError` is thrown which causes the task manager to restart. 
> In this case, Flink tries to restart the job on another task manager, causing 
> all the task managers to restart, wreaking havoc on the entire Flink cluster.
> There are some alternative type information derivation mechanisms that rely 
> on anonymous classes and, most importantly, classes generated by macros, that 
> can easily trigger the above problem. I am personally working on 
> [https://github.com/zsolt-donca/flink-alt], and there is also 
> [https://github.com/joroKr21/flink-shapeless]
> I prepared a pull request that fixes the issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to