[ 
https://issues.apache.org/jira/browse/FLINK-5964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897546#comment-15897546
 ] 

Jayson Minard edited comment on FLINK-5964 at 3/6/17 4:01 PM:
--------------------------------------------------------------

[~StephanEwen] it is when I try to make something like this older code (it was 
removed in current code but I can put it back into a current example to get the 
exact error on Flink 1.4) which was using Kotlin `Pair` instead of `Tuple2` or 
a custom class for Word Count.  I think the problem is in a call to figure out 
the Lambda type, or of the operator that ends up getting an error in 
`TypeExtractor.createTypeInfo(...)`

{code}
inline fun <reified T : Any> typeInfo(): TypeInformation<T> = 
TypeExtractor.createTypeInfo(fullType<T>().type) as TypeInformation<T>

inline fun <T: Pair<KEY, Y>, reified KEY: Any, Y: Any>  DataStream<T>.keyed(): 
KeyedStream<T, KEY>  {
    return KeyedStream<T, KEY>(this, executionEnvironment.clean({ it:T -> 
it.first }), typeInfo<KEY>())
}

@JvmName("keyedStreamSummingPairKeyLong")
fun <KEY: Any, T: Pair<KEY, Long>>  KeyedStream<T, KEY>.summed(): 
SingleOutputStreamOperator<T, *> {
    @Suppress("UNCHECKED_CAST")
    return this.reduceBy { one, two -> Pair(one.first, one.second + two.second) 
 as T }
}

@JvmName("keyedStreamSummingPairKeyInt")
fun <KEY: Any, T: Pair<KEY, Int>>  KeyedStream<T, KEY>.summed(): 
SingleOutputStreamOperator<T, *> {
    @Suppress("UNCHECKED_CAST")
    return this.reduceBy { one, two -> Pair(one.first, one.second + two.second) 
 as T }

  val counts = input.kMap { it.toLowerCase().split("""\W+""".toRegex()) }
                .kFlatMap { tokens, collector: Collector<Pair<String, Int>> ->
                    tokens.filter { it.isNotBlank() }.forEach { word ->
                        collector.collect(Pair(word, 1))
                    }
                }
                .keyed().summed()
{code}




was (Author: jayson.minard):
[~StephanEwen] it is when I try to make something like this older code (it was 
removed in current code but I can put it back into a current example to get the 
exact error on Flink 1.4).  I think it is a call to figure out the Lambda type, 
or of the operator that ends up getting an error in 
`TypeExtractor.createTypeInfo(...)`

{code}
inline fun <reified T : Any> typeInfo(): TypeInformation<T> = 
TypeExtractor.createTypeInfo(fullType<T>().type) as TypeInformation<T>

inline fun <T: Pair<KEY, Y>, reified KEY: Any, Y: Any>  DataStream<T>.keyed(): 
KeyedStream<T, KEY>  {
    return KeyedStream<T, KEY>(this, executionEnvironment.clean({ it:T -> 
it.first }), typeInfo<KEY>())
}

@JvmName("keyedStreamSummingPairKeyLong")
fun <KEY: Any, T: Pair<KEY, Long>>  KeyedStream<T, KEY>.summed(): 
SingleOutputStreamOperator<T, *> {
    @Suppress("UNCHECKED_CAST")
    return this.reduceBy { one, two -> Pair(one.first, one.second + two.second) 
 as T }
}

@JvmName("keyedStreamSummingPairKeyInt")
fun <KEY: Any, T: Pair<KEY, Int>>  KeyedStream<T, KEY>.summed(): 
SingleOutputStreamOperator<T, *> {
    @Suppress("UNCHECKED_CAST")
    return this.reduceBy { one, two -> Pair(one.first, one.second + two.second) 
 as T }

  val counts = input.kMap { it.toLowerCase().split("""\W+""".toRegex()) }
                .kFlatMap { tokens, collector: Collector<Pair<String, Int>> ->
                    tokens.filter { it.isNotBlank() }.forEach { word ->
                        collector.collect(Pair(word, 1))
                    }
                }
                .keyed().summed()
{code}



> Change TypeSerializers to allow construction of immutable types
> ---------------------------------------------------------------
>
>                 Key: FLINK-5964
>                 URL: https://issues.apache.org/jira/browse/FLINK-5964
>             Project: Flink
>          Issue Type: Improvement
>          Components: Core
>    Affects Versions: 2.0.0, 1.1.4
>            Reporter: Jayson Minard
>            Priority: Minor
>
> If your programming language has a lot of Immutable types (and with no 
> default constructor) Flink forces you to create new versions as read/write 
> POJO otherwise the types are rejected by the system.  In Kotlin for example, 
> given a class and property values we can determine which constructor to call 
> and invoke it using knowledge of default values, nullable types and which 
> properties can be set in construction or after construction.
> Flink provides no opportunity to use this model because Serializers are 
> littered with calls to `createInstance` that are not passed any values so 
> have no opportunity to fully inflate the object on construction.  
> This means that when you use Flink you throw away maybe hundreds of state 
> objects (my worst case) and have to create Flink-only variations which 
> becomes grunt work that adds no value.  
> Currently TypeExtractor isn't extendable, and all of the special cases are 
> hard coded.  It should be configured the order of checking for type 
> information so that pluggable types can be added into the chain of analysis.  
> For example before `analyzePojo` is called I could special case a Kotlin 
> class returning a different TypeInformation instance.  But I don't think that 
> solves the whole problem since other TypeSerializers make assumptions and 
> call `createInstance` on other TypeSerializers without knowing how they would 
> want to do the construction (in the Kotlin case it would be "tell me to 
> construct my instance and give me the list of named fields and serializers to 
> get their values and let me decide what to do).
> What is the best idea for this change?  With guidance, I can work on the PR.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to