[ 
https://issues.apache.org/jira/browse/FLINK-14354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16965058#comment-16965058
 ] 

Seth Wiesman commented on FLINK-14354:
--------------------------------------

-1 I don't believe this is an appropriate change. 

 

1) This would break backwards compatibility.

2) RichFunction#open is an overloaded method. Along with registering state 
descriptors users also perform arbitrary life-cycle events (spawning threads, 
creating db connections, etc). State reads likely want to perform a different 
set of background actions.

3) I don't believe it is reasonable to assume that a state reader application 
has access to the operator class that was used to write the savepoint.

> Provide interfaces instead of abstract classes in 
> org.apache.flink.state.api.functions
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-14354
>                 URL: https://issues.apache.org/jira/browse/FLINK-14354
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / State Processor
>            Reporter: Mitch Wasson
>            Priority: Minor
>
> I've started using the new state processing API in Flink 1.9. Super useful 
> and works great for the most part.
> However, I think there is opportunity to simplify implementations that use 
> the API. My request to enable these simplifications is to provides interfaces 
> instead of (or in addition to) abstract classes in 
> org.apache.flink.state.api.functions. Then have the state processing API 
> require those interfaces.
> My use case involves maintaining and processing keyed state. This is 
> accomplished with a KeyedProcessFunction:
> {color:#cc7832}class {color}BooleanProcess {color:#cc7832}extends 
> {color}KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832}, 
> {color}{color:#4e807d}String{color}{color:#cc7832}, 
> {color}{color:#4e807d}String{color}] {
>    {color:#cc7832}var {color}{color:#9876aa}bool{color}: 
> ValueState[{color:#cc7832}Boolean{color}] = _
>    {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters: 
> Configuration) {
>       {color:#9876aa}bool {color}= 
> getRuntimeContext.getState({color:#cc7832}new 
> {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
>  {color}classOf[{color:#cc7832}Boolean{color}]))
>     }
>    {color:#cc7832}override def 
> {color}{color:#ffc66d}processElement{color}(value: 
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx: 
> KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832}, 
> {color}{color:#4e807d}String{color}{color:#cc7832}, 
> {color}{color:#4e807d}String{color}]#Context{color:#cc7832}, {color}out: 
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
>      {color:#cc7832}if {color}({color:#9876aa}bool{color}.value) {
>        out.collect(value)
>     } {color:#cc7832}else {color}{
>         {color:#cc7832}if {color}(Math.random < {color:#6897bb}0.005{color}) {
>           {color:#9876aa}bool{color}.update({color:#cc7832}true{color})
>           out.collect(value)
>         }
>       }
>     }
>  }
>  
> I then use a KeyedStateReaderFunction like this to inspect 
> savepoints/checkpoints:
>  
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends 
> {color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832}, 
> {color}{color:#4e807d}String{color}] {
>    {color:#cc7832}var {color}{color:#9876aa}bool{color}: 
> ValueState[{color:#cc7832}Boolean{color}] = _
>    {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters: 
> Configuration) {
>       {color:#9876aa}bool {color}= 
> getRuntimeContext.getState({color:#cc7832}new 
> {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
>  {color}classOf[{color:#cc7832}Boolean{color}]))
>     }
>    {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key: 
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx: 
> KeyedStateReaderFunction.Context{color:#cc7832}, {color}out: 
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}=
> {      out.collect(key)        }
> }
> Ideally, I would like my KeyedStateReaderFunction to look like this:
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends 
> BooleanProcess{color} implements 
> KeyedStateReaderFunction[String{color:#cc7832}, 
> {color}{color:#4e807d}String{color}] {
>    {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key: 
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx: 
> KeyedStateReaderFunction.Context{color:#cc7832}, {color}out: 
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}=
> {      out.collect(key)    }
> }
> However, this can't be done with the current API due Java's single 
> inheritance and KeyedStateReaderFunction being an abstract class.
> The code savings are rather trivial in this example. However, it makes the 
> state reader much easier to maintain. It would automatically inherit state 
> and life cycle methods from the class whose state it is inspecting.
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to