[ 
https://issues.apache.org/jira/browse/FLINK-23979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414871#comment-17414871
 ] 

Frevib commented on FLINK-23979:
--------------------------------

The reified solution works very nicely. _reified_ keeps the the generic type 
information so this would make a hacky solution not needed any more, for Kotlin 
at least. You mentioned it's difficult to keep multiple APIs (Scala, Java) in 
sync for the sole purpose of better type extraction. What would you estimate it 
takes to create a _flink-core-kotlin_ and _flink-streaming-kotlin_? Almost all 
of _flink-....-java_ can be reused and extension functions can replace the 
existing Java API. Just thinking out loud here. I will be willing to put some 
time in.

  

[~twalthr] apologies, me and 
[https://youtrack.jetbrains.com/issue/KT-48422#focus=Comments-27-5190111.0-0] 
are the same person :) . Jira shows my name and not my nick name, I changed 
that now.

> Exceptions with Kotlin 1.5.0 and higher
> ---------------------------------------
>
>                 Key: FLINK-23979
>                 URL: https://issues.apache.org/jira/browse/FLINK-23979
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.13.2
>            Reporter: Frevib
>            Priority: Major
>
> *Summary*
> keyBy(..) function triggers exceptions when using Kotlin. Different Kotlin 
> compiler versions give different exceptions.
>  
> *Reproduce*
> See below.
>  
> *Using Kotlin 1.5.20 and 1.5.30*
>  
> When using
> {code:java}
> .keyBy(...){code}
> the following runtime exception occurs: 
> {code:java}
> Exception in thread "main" 
> org.apache.flink.api.common.functions.InvalidTypesException: The types of the 
> interface org.apache.flink.api.java.functions.KeySelector could not be 
> inferred. Support for synthetic interfaces, lambdas, and generic or raw types 
> is limited at this point
>         at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1244)
>         at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java:1268)
>         at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1231)
>         at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:789)
>         at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
>         at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:436)
>         at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:429)
>         at 
> org.apache.flink.streaming.api.datastream.KeyedStream.<init>(KeyedStream.java:118)
>         at 
> org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:296)
>         at FraudDetectionKt.main(FraudDetection.kt:23){code}
>  
> Update: this seemed to be an issue in Kotlin < 1.4.0 as well: 
> https://github.com/classpass/flink-kotlin#lambdas-kotlin-pre-14-and-invalidtypesexception
>  
> *Using Kotlin 1.5.0 – +FIXED in 1.5.10+*
> When using
> {code:java}
> .keyBy(...)
> {code}
> gives the following runtime exception:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: Object 
> FraudDetectionKt$$Lambda$138/0x00000008001e6440@7d446ed1 is not serializable
>       at 
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:180)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1901)
>       at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:189)
>       at 
> org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:296)
>       at FraudDetectionKt.main(FraudDetection.kt:23)
> {code}
> Using and older version of Kotlin, e.g 1.4.32, this exception does not occur 
> and the program runs fine.
>  
> Some research points this change log that might have something to do with 
> these exceptions?
> [https://kotlinlang.org/docs/whatsnew15.html#lambdas-via-invokedynamic]
>  
> *Reproduce*
> Use the code from the tutorial:
> [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to