[
https://issues.apache.org/jira/browse/FLINK-14354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mitch Wasson updated FLINK-14354:
---------------------------------
Description:
I've started using the new state processing API in Flink 1.9. Super useful and
works great for the most part.
However, I think there is opportunity to simplify implementations that use the
API. My request to enable these simplifications is to provides interfaces
instead of (or in addition to) abstract classes in
org.apache.flink.state.api.functions. Then have the state processing API
require those interfaces.
My use case involves maintaining and processing keyed state. This is
accomplished with a KeyedProcessFunction:
{color:#cc7832}class {color}BooleanProcess {color:#cc7832}extends
{color}KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}] {
{color:#cc7832}var {color}{color:#9876aa}bool{color}:
ValueState[{color:#cc7832}Boolean{color}] = _
{color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
Configuration) {
{color:#9876aa}bool {color}=
getRuntimeContext.getState({color:#cc7832}new
{color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
{color}classOf[{color:#cc7832}Boolean{color}]))
}
{color:#cc7832}override def
{color}{color:#ffc66d}processElement{color}(value:
{color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}]#Context{color:#cc7832}, {color}out:
Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
{color:#cc7832}if {color}({color:#9876aa}bool{color}.value) {
out.collect(value)
} {color:#cc7832}else {color}{
{color:#cc7832}if {color}(Math.random < {color:#6897bb}0.005{color}) {
{color:#9876aa}bool{color}.update({color:#cc7832}true{color})
out.collect(value)
}
}
}
}
I then use a KeyedStateReaderFunction like this to inspect
savepoints/checkpoints:
{color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
{color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}] {
{color:#cc7832}var {color}{color:#9876aa}bool{color}:
ValueState[{color:#cc7832}Boolean{color}] = _
{color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
Configuration) {
{color:#9876aa}bool {color}=
getRuntimeContext.getState({color:#cc7832}new
{color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
{color}classOf[{color:#cc7832}Boolean{color}]))
}
{color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
{color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
out.collect(key)
}
}
Ideally, I would like my KeyedStateReaderFunction to look like this:
{color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
BooleanProcess{color} implements
KeyedStateReaderFunction[String{color:#cc7832},
{color}{color:#4e807d}String{color}] {
{color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
{color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
out.collect(key)
}
}
However, this can't be done with the current API due Java's single inheritance
and KeyedStateReaderFunction being an abstract class.
The code savings are rather trivial in this example. However, it makes the
state reader much easier to maintain. It would automatically inherit state and
life cycle methods from the class whose state it is inspecting.
was:
I've started using the new state processing API in Flink 1.9. Super useful and
works great for the most part.
However, I think there is opportunity to simplify implementations that use the
API. My request to enable these simplifications is to provides interfaces
instead of (or in addition to) abstract classes in
org.apache.flink.state.api.functions. Then have the state processing API
require those interfaces.
My use case involves maintaining and processing keyed state. This is
accomplished with a KeyedProcessFunction:
{color:#cc7832}class {color}BooleanProcess {color:#cc7832}extends
{color}KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}] {
{color:#cc7832}var {color}{color:#9876aa}bool{color}:
ValueState[{color:#cc7832}Boolean{color}] = _
{color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
Configuration) {
{color:#9876aa}bool {color}= getRuntimeContext.getState({color:#cc7832}new
{color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
{color}classOf[{color:#cc7832}Boolean{color}]))
}
{color:#cc7832}override def
{color}{color:#ffc66d}processElement{color}(value:
{color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}]#Context{color:#cc7832}, {color}out:
Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
{color:#cc7832}if {color}({color:#9876aa}bool{color}.value) {
out.collect(value)
} {color:#cc7832}else {color}{
{color:#cc7832}if {color}(Math.random < {color:#6897bb}0.005{color}) {
{color:#9876aa}bool{color}.update({color:#cc7832}true{color})
out.collect(value)
}
}
}
}
I then use a KeyedStateReaderFunction like this to inspect
savepoints/checkpoints:
{color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
{color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}] {
{color:#cc7832}var {color}{color:#9876aa}bool{color}:
ValueState[{color:#cc7832}Boolean{color}] = _
{color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
Configuration) {
{color:#9876aa}bool {color}= getRuntimeContext.getState({color:#cc7832}new
{color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
{color}classOf[{color:#cc7832}Boolean{color}]))
}
{color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
{color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
out.collect(key)
}
}
Ideally, I would like my KeyedStateReaderFunction to look like this:
{color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
{color:#172b4d}BooleanProcess{color} implements
{color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832},
{color}{color:#4e807d}String{color}] {
{color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
{color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
out.collect(key)
}
}
However, this can't be done with the current API due Java's single inheritance
and KeyedStateReaderFunction being an abstract class.
The code savings are rather trivial in this example. However, it makes the
state reader much easier to maintain. It would automatically inherit state and
lifecycle methods from the class whose state it is inspecting.
> Provide interfaces instead of abstract classes in
> org.apache.flink.state.api.functions
> --------------------------------------------------------------------------------------
>
> Key: FLINK-14354
> URL: https://issues.apache.org/jira/browse/FLINK-14354
> Project: Flink
> Issue Type: Improvement
> Reporter: Mitch Wasson
> Priority: Minor
>
> I've started using the new state processing API in Flink 1.9. Super useful
> and works great for the most part.
> However, I think there is opportunity to simplify implementations that use
> the API. My request to enable these simplifications is to provides interfaces
> instead of (or in addition to) abstract classes in
> org.apache.flink.state.api.functions. Then have the state processing API
> require those interfaces.
> My use case involves maintaining and processing keyed state. This is
> accomplished with a KeyedProcessFunction:
> {color:#cc7832}class {color}BooleanProcess {color:#cc7832}extends
> {color}KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}] {
> {color:#cc7832}var {color}{color:#9876aa}bool{color}:
> ValueState[{color:#cc7832}Boolean{color}] = _
> {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
> Configuration) {
> {color:#9876aa}bool {color}=
> getRuntimeContext.getState({color:#cc7832}new
> {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
> {color}classOf[{color:#cc7832}Boolean{color}]))
> }
> {color:#cc7832}override def
> {color}{color:#ffc66d}processElement{color}(value:
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
> KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}]#Context{color:#cc7832}, {color}out:
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
> {color:#cc7832}if {color}({color:#9876aa}bool{color}.value) {
> out.collect(value)
> } {color:#cc7832}else {color}{
> {color:#cc7832}if {color}(Math.random < {color:#6897bb}0.005{color}) {
> {color:#9876aa}bool{color}.update({color:#cc7832}true{color})
> out.collect(value)
> }
> }
> }
> }
>
> I then use a KeyedStateReaderFunction like this to inspect
> savepoints/checkpoints:
>
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
> {color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}] {
> {color:#cc7832}var {color}{color:#9876aa}bool{color}:
> ValueState[{color:#cc7832}Boolean{color}] = _
> {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
> Configuration) {
> {color:#9876aa}bool {color}=
> getRuntimeContext.getState({color:#cc7832}new
> {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
> {color}classOf[{color:#cc7832}Boolean{color}]))
> }
> {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
> KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
> out.collect(key)
> }
> }
>
>
> Ideally, I would like my KeyedStateReaderFunction to look like this:
>
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
> BooleanProcess{color} implements
> KeyedStateReaderFunction[String{color:#cc7832},
> {color}{color:#4e807d}String{color}] {
>
> {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
> KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
> out.collect(key)
> }
> }
> However, this can't be done with the current API due Java's single
> inheritance and KeyedStateReaderFunction being an abstract class.
> The code savings are rather trivial in this example. However, it makes the
> state reader much easier to maintain. It would automatically inherit state
> and life cycle methods from the class whose state it is inspecting.
>
>
>
>
>
>
>
>
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)