[ 
https://issues.apache.org/jira/browse/FLINK-5964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169780#comment-16169780
 ] 

David Anderson commented on FLINK-5964:
---------------------------------------

Here's an example that compiles, but fails at runtime:

{code:java}
public class FlinkApp {
    companion object {
        @Throws(Exception::class)
        @JvmStatic public fun main(args: Array<String>) {
            val env = StreamExecutionEnvironment.getExecutionEnvironment()

            env
                .socketTextStream("localhost", 9999)
                .flatMap(LineSplitter())
                .keyBy{it.word}
                .sum("count")
                .print()

            env.execute()
        }

        data class Event(val word: String, val count: Int)

        class LineSplitter : FlatMapFunction<String, Event> {
            override fun flatMap(value: String, out: Collector<Event>) {
                val tokens = 
value.toLowerCase().split("\\W+".toRegex()).dropLastWhile { it.isEmpty() 
}.toTypedArray()

                for (token in tokens) {
                    if (token.length > 0) {
                        out.collect(Event(token, 1))
                    }
                }
            }
        }
    }
}
{code}


{noformat}
Exception in thread "main" 
org.apache.flink.api.common.functions.InvalidTypesException: The types of the 
interface org.apache.flink.api.java.functions.KeySelector could not be 
inferred. Support for synthetic interfaces, lambdas, and generic or raw types 
is limited at this point
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1270)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:822)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:622)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:443)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:436)
        at 
org.apache.flink.streaming.api.datastream.KeyedStream.<init>(KeyedStream.java:108)
        at 
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:263)
        at kotflink.FlinkApp$Companion.main(app.kt:18)
        at kotflink.FlinkApp.main(app.kt)
{noformat}


> Change TypeSerializers to allow construction of immutable types
> ---------------------------------------------------------------
>
>                 Key: FLINK-5964
>                 URL: https://issues.apache.org/jira/browse/FLINK-5964
>             Project: Flink
>          Issue Type: Improvement
>          Components: Core
>    Affects Versions: 2.0.0, 1.1.4
>            Reporter: Jayson Minard
>            Priority: Minor
>
> If your programming language has a lot of Immutable types (and with no 
> default constructor) Flink forces you to create new versions as read/write 
> POJO otherwise the types are rejected by the system.  In Kotlin for example, 
> given a class and property values we can determine which constructor to call 
> and invoke it using knowledge of default values, nullable types and which 
> properties can be set in construction or after construction.
> Flink provides no opportunity to use this model because Serializers are 
> littered with calls to `createInstance` that are not passed any values so 
> have no opportunity to fully inflate the object on construction.  
> This means that when you use Flink you throw away maybe hundreds of state 
> objects (my worst case) and have to create Flink-only variations which 
> becomes grunt work that adds no value.  
> Currently TypeExtractor isn't extendable, and all of the special cases are 
> hard coded.  It should be configured the order of checking for type 
> information so that pluggable types can be added into the chain of analysis.  
> For example before `analyzePojo` is called I could special case a Kotlin 
> class returning a different TypeInformation instance.  But I don't think that 
> solves the whole problem since other TypeSerializers make assumptions and 
> call `createInstance` on other TypeSerializers without knowing how they would 
> want to do the construction (in the Kotlin case it would be "tell me to 
> construct my instance and give me the list of named fields and serializers to 
> get their values and let me decide what to do).
> What is the best idea for this change?  With guidance, I can work on the PR.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to