[
https://issues.apache.org/jira/browse/FLINK-14354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16965058#comment-16965058
]
Seth Wiesman commented on FLINK-14354:
--------------------------------------
-1 I don't believe this is an appropriate change.
1) This would break backwards compatibility.
2) RichFunction#open is an overloaded method. Along with registering state
descriptors users also perform arbitrary life-cycle events (spawning threads,
creating db connections, etc). State reads likely want to perform a different
set of background actions.
3) I don't believe it is reasonable to assume that a state reader application
has access to the operator class that was used to write the savepoint.
> Provide interfaces instead of abstract classes in
> org.apache.flink.state.api.functions
> --------------------------------------------------------------------------------------
>
> Key: FLINK-14354
> URL: https://issues.apache.org/jira/browse/FLINK-14354
> Project: Flink
> Issue Type: Improvement
> Components: API / State Processor
> Reporter: Mitch Wasson
> Priority: Minor
>
> I've started using the new state processing API in Flink 1.9. Super useful
> and works great for the most part.
> However, I think there is opportunity to simplify implementations that use
> the API. My request to enable these simplifications is to provides interfaces
> instead of (or in addition to) abstract classes in
> org.apache.flink.state.api.functions. Then have the state processing API
> require those interfaces.
> My use case involves maintaining and processing keyed state. This is
> accomplished with a KeyedProcessFunction:
> {color:#cc7832}class {color}BooleanProcess {color:#cc7832}extends
> {color}KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}] {
> {color:#cc7832}var {color}{color:#9876aa}bool{color}:
> ValueState[{color:#cc7832}Boolean{color}] = _
> {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
> Configuration) {
> {color:#9876aa}bool {color}=
> getRuntimeContext.getState({color:#cc7832}new
> {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
> {color}classOf[{color:#cc7832}Boolean{color}]))
> }
> {color:#cc7832}override def
> {color}{color:#ffc66d}processElement{color}(value:
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
> KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}]#Context{color:#cc7832}, {color}out:
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
> {color:#cc7832}if {color}({color:#9876aa}bool{color}.value) {
> out.collect(value)
> } {color:#cc7832}else {color}{
> {color:#cc7832}if {color}(Math.random < {color:#6897bb}0.005{color}) {
> {color:#9876aa}bool{color}.update({color:#cc7832}true{color})
> out.collect(value)
> }
> }
> }
> }
>
> I then use a KeyedStateReaderFunction like this to inspect
> savepoints/checkpoints:
>
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
> {color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}] {
> {color:#cc7832}var {color}{color:#9876aa}bool{color}:
> ValueState[{color:#cc7832}Boolean{color}] = _
> {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
> Configuration) {
> {color:#9876aa}bool {color}=
> getRuntimeContext.getState({color:#cc7832}new
> {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
> {color}classOf[{color:#cc7832}Boolean{color}]))
> }
> {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
> KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}=
> { out.collect(key) }
> }
> Ideally, I would like my KeyedStateReaderFunction to look like this:
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
> BooleanProcess{color} implements
> KeyedStateReaderFunction[String{color:#cc7832},
> {color}{color:#4e807d}String{color}] {
> {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
> KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}=
> { out.collect(key) }
> }
> However, this can't be done with the current API due Java's single
> inheritance and KeyedStateReaderFunction being an abstract class.
> The code savings are rather trivial in this example. However, it makes the
> state reader much easier to maintain. It would automatically inherit state
> and life cycle methods from the class whose state it is inspecting.
>
>
>
>
>
>
>
>
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)