[
https://issues.apache.org/jira/browse/FLINK-5964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16169780#comment-16169780
]
David Anderson commented on FLINK-5964:
---------------------------------------
Here's an example that compiles, but fails at runtime:
{code:java}
public class FlinkApp {
companion object {
@Throws(Exception::class)
@JvmStatic public fun main(args: Array<String>) {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env
.socketTextStream("localhost", 9999)
.flatMap(LineSplitter())
.keyBy{it.word}
.sum("count")
.print()
env.execute()
}
data class Event(val word: String, val count: Int)
class LineSplitter : FlatMapFunction<String, Event> {
override fun flatMap(value: String, out: Collector<Event>) {
val tokens =
value.toLowerCase().split("\\W+".toRegex()).dropLastWhile { it.isEmpty()
}.toTypedArray()
for (token in tokens) {
if (token.length > 0) {
out.collect(Event(token, 1))
}
}
}
}
}
}
{code}
{noformat}
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: The types of the
interface org.apache.flink.api.java.functions.KeySelector could not be
inferred. Support for synthetic interfaces, lambdas, and generic or raw types
is limited at this point
at
org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:1270)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:822)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:622)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:443)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getKeySelectorTypes(TypeExtractor.java:436)
at
org.apache.flink.streaming.api.datastream.KeyedStream.<init>(KeyedStream.java:108)
at
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:263)
at kotflink.FlinkApp$Companion.main(app.kt:18)
at kotflink.FlinkApp.main(app.kt)
{noformat}
> Change TypeSerializers to allow construction of immutable types
> ---------------------------------------------------------------
>
> Key: FLINK-5964
> URL: https://issues.apache.org/jira/browse/FLINK-5964
> Project: Flink
> Issue Type: Improvement
> Components: Core
> Affects Versions: 2.0.0, 1.1.4
> Reporter: Jayson Minard
> Priority: Minor
>
> If your programming language has a lot of Immutable types (and with no
> default constructor) Flink forces you to create new versions as read/write
> POJO otherwise the types are rejected by the system. In Kotlin for example,
> given a class and property values we can determine which constructor to call
> and invoke it using knowledge of default values, nullable types and which
> properties can be set in construction or after construction.
> Flink provides no opportunity to use this model because Serializers are
> littered with calls to `createInstance` that are not passed any values so
> have no opportunity to fully inflate the object on construction.
> This means that when you use Flink you throw away maybe hundreds of state
> objects (my worst case) and have to create Flink-only variations which
> becomes grunt work that adds no value.
> Currently TypeExtractor isn't extendable, and all of the special cases are
> hard coded. It should be configured the order of checking for type
> information so that pluggable types can be added into the chain of analysis.
> For example before `analyzePojo` is called I could special case a Kotlin
> class returning a different TypeInformation instance. But I don't think that
> solves the whole problem since other TypeSerializers make assumptions and
> call `createInstance` on other TypeSerializers without knowing how they would
> want to do the construction (in the Kotlin case it would be "tell me to
> construct my instance and give me the list of named fields and serializers to
> get their values and let me decide what to do).
> What is the best idea for this change? With guidance, I can work on the PR.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)