[
https://issues.apache.org/jira/browse/FLINK-23979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17414044#comment-17414044
]
Hank commented on FLINK-23979:
------------------------------
Discussion is also continued in [https://youtrack.jetbrains.com/issue/KT-48422]
, thanks for commenting there.
Are you planning Kotlin lambda support? Kotlin supports accessing type
parameters using the `reified` keyword:
[https://kotlinlang.org/docs/inline-functions.html#reified-type-parameters] ,
which enables creating an extension function like this that enables lambdas to
work with _keyBy_:
{code:java}
inline fun <T, reified K> DataStream<T>.keyByKotlin(noinline function: (T) ->
K): KeyedStream<T, K> =
keyBy(function, TypeInformation.of(K::class.java))
{code}
There is also an initiative to support Kotlin by using extension functions
here:
[https://github.com/classpass/flink-kotlin#lambdas-kotlin-pre-14-and-invalidtypesexception]
, but it doesn't seem very active.
> Exceptions with Kotlin 1.5.0 and higher
> ---------------------------------------
>
> Key: FLINK-23979
> URL: https://issues.apache.org/jira/browse/FLINK-23979
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.13.2
> Reporter: Hank
> Priority: Major
>
> *Summary*
> keyBy(..) function triggers exceptions when using Kotlin. Different Kotlin
> compiler versions give different exceptions.
>
> *Reproduce*
> See below.
>
> *Using Kotlin 1.5.20 and 1.5.30*
>
> When using
> {code:java}
> .keyBy(...){code}
> the following runtime exception occurs:
> {code:java}
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: The types of the
> interface org.apache.flink.api.java.functions.KeySelector could not be
> inferred. Support for synthetic interfaces, lambdas, and generic or raw types
> is limited at this point
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1244)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java:1268)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1231)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:789)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:436)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:429)
> at
> org.apache.flink.streaming.api.datastream.KeyedStream.<init>(KeyedStream.java:118)
> at
> org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:296)
> at FraudDetectionKt.main(FraudDetection.kt:23){code}
>
> Update: this seemed to be an issue in Kotlin < 1.4.0 as well:
> https://github.com/classpass/flink-kotlin#lambdas-kotlin-pre-14-and-invalidtypesexception
>
> *Using Kotlin 1.5.0 – +FIXED in 1.5.10+*
> When using
> {code:java}
> .keyBy(...)
> {code}
> gives the following runtime exception:
> {code:java}
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Object
> FraudDetectionKt$$Lambda$138/0x00000008001e6440@7d446ed1 is not serializable
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:180)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1901)
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:189)
> at
> org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:296)
> at FraudDetectionKt.main(FraudDetection.kt:23)
> {code}
> Using and older version of Kotlin, e.g 1.4.32, this exception does not occur
> and the program runs fine.
>
> Some research points this change log that might have something to do with
> these exceptions?
> [https://kotlinlang.org/docs/whatsnew15.html#lambdas-via-invokedynamic]
>
> *Reproduce*
> Use the code from the tutorial:
> [https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/datastream/]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)