[
https://issues.apache.org/jira/browse/FLINK-5964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897546#comment-15897546
]
Jayson Minard edited comment on FLINK-5964 at 3/6/17 4:01 PM:
--------------------------------------------------------------
[~StephanEwen] it is when I try to make something like this older code (it was
removed in current code but I can put it back into a current example to get the
exact error on Flink 1.4) which was using Kotlin `Pair` instead of `Tuple2` or
a custom class for Word Count. I think the problem is in a call to figure out
the Lambda type, or of the operator that ends up getting an error in
`TypeExtractor.createTypeInfo(...)`
{code}
inline fun <reified T : Any> typeInfo(): TypeInformation<T> =
TypeExtractor.createTypeInfo(fullType<T>().type) as TypeInformation<T>
inline fun <T: Pair<KEY, Y>, reified KEY: Any, Y: Any> DataStream<T>.keyed():
KeyedStream<T, KEY> {
return KeyedStream<T, KEY>(this, executionEnvironment.clean({ it:T ->
it.first }), typeInfo<KEY>())
}
@JvmName("keyedStreamSummingPairKeyLong")
fun <KEY: Any, T: Pair<KEY, Long>> KeyedStream<T, KEY>.summed():
SingleOutputStreamOperator<T, *> {
@Suppress("UNCHECKED_CAST")
return this.reduceBy { one, two -> Pair(one.first, one.second + two.second)
as T }
}
@JvmName("keyedStreamSummingPairKeyInt")
fun <KEY: Any, T: Pair<KEY, Int>> KeyedStream<T, KEY>.summed():
SingleOutputStreamOperator<T, *> {
@Suppress("UNCHECKED_CAST")
return this.reduceBy { one, two -> Pair(one.first, one.second + two.second)
as T }
val counts = input.kMap { it.toLowerCase().split("""\W+""".toRegex()) }
.kFlatMap { tokens, collector: Collector<Pair<String, Int>> ->
tokens.filter { it.isNotBlank() }.forEach { word ->
collector.collect(Pair(word, 1))
}
}
.keyed().summed()
{code}
was (Author: jayson.minard):
[~StephanEwen] it is when I try to make something like this older code (it was
removed in current code but I can put it back into a current example to get the
exact error on Flink 1.4). I think it is a call to figure out the Lambda type,
or of the operator that ends up getting an error in
`TypeExtractor.createTypeInfo(...)`
{code}
inline fun <reified T : Any> typeInfo(): TypeInformation<T> =
TypeExtractor.createTypeInfo(fullType<T>().type) as TypeInformation<T>
inline fun <T: Pair<KEY, Y>, reified KEY: Any, Y: Any> DataStream<T>.keyed():
KeyedStream<T, KEY> {
return KeyedStream<T, KEY>(this, executionEnvironment.clean({ it:T ->
it.first }), typeInfo<KEY>())
}
@JvmName("keyedStreamSummingPairKeyLong")
fun <KEY: Any, T: Pair<KEY, Long>> KeyedStream<T, KEY>.summed():
SingleOutputStreamOperator<T, *> {
@Suppress("UNCHECKED_CAST")
return this.reduceBy { one, two -> Pair(one.first, one.second + two.second)
as T }
}
@JvmName("keyedStreamSummingPairKeyInt")
fun <KEY: Any, T: Pair<KEY, Int>> KeyedStream<T, KEY>.summed():
SingleOutputStreamOperator<T, *> {
@Suppress("UNCHECKED_CAST")
return this.reduceBy { one, two -> Pair(one.first, one.second + two.second)
as T }
val counts = input.kMap { it.toLowerCase().split("""\W+""".toRegex()) }
.kFlatMap { tokens, collector: Collector<Pair<String, Int>> ->
tokens.filter { it.isNotBlank() }.forEach { word ->
collector.collect(Pair(word, 1))
}
}
.keyed().summed()
{code}
> Change TypeSerializers to allow construction of immutable types
> ---------------------------------------------------------------
>
> Key: FLINK-5964
> URL: https://issues.apache.org/jira/browse/FLINK-5964
> Project: Flink
> Issue Type: Improvement
> Components: Core
> Affects Versions: 2.0.0, 1.1.4
> Reporter: Jayson Minard
> Priority: Minor
>
> If your programming language has a lot of Immutable types (and with no
> default constructor) Flink forces you to create new versions as read/write
> POJO otherwise the types are rejected by the system. In Kotlin for example,
> given a class and property values we can determine which constructor to call
> and invoke it using knowledge of default values, nullable types and which
> properties can be set in construction or after construction.
> Flink provides no opportunity to use this model because Serializers are
> littered with calls to `createInstance` that are not passed any values so
> have no opportunity to fully inflate the object on construction.
> This means that when you use Flink you throw away maybe hundreds of state
> objects (my worst case) and have to create Flink-only variations which
> becomes grunt work that adds no value.
> Currently TypeExtractor isn't extendable, and all of the special cases are
> hard coded. It should be configured the order of checking for type
> information so that pluggable types can be added into the chain of analysis.
> For example before `analyzePojo` is called I could special case a Kotlin
> class returning a different TypeInformation instance. But I don't think that
> solves the whole problem since other TypeSerializers make assumptions and
> call `createInstance` on other TypeSerializers without knowing how they would
> want to do the construction (in the Kotlin case it would be "tell me to
> construct my instance and give me the list of named fields and serializers to
> get their values and let me decide what to do).
> What is the best idea for this change? With guidance, I can work on the PR.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)