Mitch Wasson created FLINK-14354:
------------------------------------

             Summary: Provide interfaces instead of abstract classes in 
org.apache.flink.state.api.functions
                 Key: FLINK-14354
                 URL: https://issues.apache.org/jira/browse/FLINK-14354
             Project: Flink
          Issue Type: Improvement
            Reporter: Mitch Wasson


I've started using the new state processing API in Flink 1.9. Super useful and 
works great for the most part.

However, I think there is opportunity to simplify implementations that use the 
API. My request to enable these simplifications is to provides interfaces 
instead of (or in addition to) abstract classes in 
org.apache.flink.state.api.functions. Then have the state processing API 
require those interfaces.

My use case involves maintaining and processing keyed state. This is 
accomplished with a KeyedProcessFunction:

{color:#cc7832}class {color}BooleanProcess {color:#cc7832}extends 
{color}KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832}, 
{color}{color:#4e807d}String{color}{color:#cc7832}, 
{color}{color:#4e807d}String{color}] {

   {color:#cc7832}var {color}{color:#9876aa}bool{color}: 
ValueState[{color:#cc7832}Boolean{color}] = _

   {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters: 
Configuration) {
     {color:#9876aa}bool {color}= getRuntimeContext.getState({color:#cc7832}new 
{color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
 {color}classOf[{color:#cc7832}Boolean{color}]))
   }

   {color:#cc7832}override def 
{color}{color:#ffc66d}processElement{color}(value: 
{color:#4e807d}String{color}{color:#cc7832}, {color}ctx: 
KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832}, 
{color}{color:#4e807d}String{color}{color:#cc7832}, 
{color}{color:#4e807d}String{color}]#Context{color:#cc7832}, {color}out: 
Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {

     {color:#cc7832}if {color}({color:#9876aa}bool{color}.value) {
       out.collect(value)
     } {color:#cc7832}else {color}{
       {color:#cc7832}if {color}(Math.random < {color:#6897bb}0.005{color}) {
         {color:#9876aa}bool{color}.update({color:#cc7832}true{color})
         out.collect(value)
       }
     }
   }
}


 I then use a KeyedStateReaderFunction like this to inspect 
savepoints/checkpoints:

{color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends 
{color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832}, 
{color}{color:#4e807d}String{color}] {

   {color:#cc7832}var {color}{color:#9876aa}bool{color}: 
ValueState[{color:#cc7832}Boolean{color}] = _

   {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters: 
Configuration) {
     {color:#9876aa}bool {color}= getRuntimeContext.getState({color:#cc7832}new 
{color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
 {color}classOf[{color:#cc7832}Boolean{color}]))
   }

   {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key: 
{color:#4e807d}String{color}{color:#cc7832}, {color}ctx: 
KeyedStateReaderFunction.Context{color:#cc7832}, {color}out: 
Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
     out.collect(key)
   }
}

 

Ideally, I would like my KeyedStateReaderFunction to look like this:

{color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends 
{color:#172b4d}BooleanProcess{color} implements 
{color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832}, 
{color}{color:#4e807d}String{color}] {

   {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key: 
{color:#4e807d}String{color}{color:#cc7832}, {color}ctx: 
KeyedStateReaderFunction.Context{color:#cc7832}, {color}out: 
Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
     out.collect(key)
   }
}

However, this can't be done with the current API due Java's single inheritance 
and KeyedStateReaderFunction being an abstract class.

The code savings are rather trivial in this example. However, it makes the 
state reader much easier to maintain. It would automatically inherit state and 
lifecycle methods from the class whose state it is inspecting.

 

 

 

 

 

 

 

 

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to