[
https://issues.apache.org/jira/browse/FLINK-14354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16966799#comment-16966799
]
Mitch Wasson commented on FLINK-14354:
--------------------------------------
??1) This would break backwards compatibility.??
I think this technique would preserve backward compatibility:
In the example I give, backward compatibility problems are due to the class
hierarchies being forced through the abstract class {{AbstractRichFunction}}.
{{ExistingSavepoint::readKeyedState}} accepts a {{KeyedStateReaderFunction}}.
{{KeyedStateReaderFunction}} is an abstract class but it doesn't hold any
state. It is forced to be an abstract class since it extends
{{AbstractRichFunction}} which does have state (an instance of
{{RuntimeContext}}).
The API for {{ExistingSavepoint::readKeyedState}} could be changed to receive a
{{KeyedStateReaderFunctionInterface}} that implements {{RichFunction}}. This
cuts out {{AbstractRichFunction}} without sacrificing the interface available.
Then {{KeyedStateReaderFunction}} would implement
{{KeyedStateReaderFunctionInterface}} so it could continue to be used as-is.
??2) RichFunction#open is an overloaded method. Along with registering state
descriptors users also perform arbitrary life-cycle events (spawning threads,
creating db connections, etc). State reads likely want to perform a different
set of background actions.??
They would still be able to. Just override {{RichFunction#open}}. Also, nothing
forces the state reader applications to extend the operator class. The current
implementation pattern is still possible.
??3) I don't believe it is reasonable to assume that a state reader application
has access to the operator class that was used to write the savepoint.??
There is no assumption that the state reader application has access to the
operator class. As I said above, nothing forces extending the operator class
and the current implementation pattern is still possible.
> Provide interfaces instead of abstract classes in
> org.apache.flink.state.api.functions
> --------------------------------------------------------------------------------------
>
> Key: FLINK-14354
> URL: https://issues.apache.org/jira/browse/FLINK-14354
> Project: Flink
> Issue Type: Improvement
> Components: API / State Processor
> Reporter: Mitch Wasson
> Priority: Minor
>
> I've started using the new state processing API in Flink 1.9. Super useful
> and works great for the most part.
> However, I think there is opportunity to simplify implementations that use
> the API. My request to enable these simplifications is to provides interfaces
> instead of (or in addition to) abstract classes in
> org.apache.flink.state.api.functions. Then have the state processing API
> require those interfaces.
> My use case involves maintaining and processing keyed state. This is
> accomplished with a KeyedProcessFunction:
> {color:#cc7832}class {color}BooleanProcess {color:#cc7832}extends
> {color}KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}] {
> {color:#cc7832}var {color}{color:#9876aa}bool{color}:
> ValueState[{color:#cc7832}Boolean{color}] = _
> {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
> Configuration) {
> {color:#9876aa}bool {color}=
> getRuntimeContext.getState({color:#cc7832}new
> {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
> {color}classOf[{color:#cc7832}Boolean{color}]))
> }
> {color:#cc7832}override def
> {color}{color:#ffc66d}processElement{color}(value:
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
> KeyedProcessFunction[{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}]#Context{color:#cc7832}, {color}out:
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}= {
> {color:#cc7832}if {color}({color:#9876aa}bool{color}.value) {
> out.collect(value)
> } {color:#cc7832}else {color}{
> {color:#cc7832}if {color}(Math.random < {color:#6897bb}0.005{color}) {
> {color:#9876aa}bool{color}.update({color:#cc7832}true{color})
> out.collect(value)
> }
> }
> }
> }
>
> I then use a KeyedStateReaderFunction like this to inspect
> savepoints/checkpoints:
>
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
> {color}KeyedStateReaderFunction[{color:#4e807d}String{color}{color:#cc7832},
> {color}{color:#4e807d}String{color}] {
> {color:#cc7832}var {color}{color:#9876aa}bool{color}:
> ValueState[{color:#cc7832}Boolean{color}] = _
> {color:#cc7832}override def {color}{color:#ffc66d}open{color}(parameters:
> Configuration) {
> {color:#9876aa}bool {color}=
> getRuntimeContext.getState({color:#cc7832}new
> {color}ValueStateDescriptor({color:#6a8759}"boolean-state"{color}{color:#cc7832},
> {color}classOf[{color:#cc7832}Boolean{color}]))
> }
> {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
> KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}=
> { out.collect(key) }
> }
> Ideally, I would like my KeyedStateReaderFunction to look like this:
> {color:#cc7832}class {color}BooleanProcessStateReader {color:#cc7832}extends
> BooleanProcess{color} implements
> KeyedStateReaderFunction[String{color:#cc7832},
> {color}{color:#4e807d}String{color}] {
> {color:#cc7832}override def {color}{color:#ffc66d}readKey{color}(key:
> {color:#4e807d}String{color}{color:#cc7832}, {color}ctx:
> KeyedStateReaderFunction.Context{color:#cc7832}, {color}out:
> Collector[{color:#4e807d}String{color}]): {color:#cc7832}Unit {color}=
> { out.collect(key) }
> }
> However, this can't be done with the current API due Java's single
> inheritance and KeyedStateReaderFunction being an abstract class.
> The code savings are rather trivial in this example. However, it makes the
> state reader much easier to maintain. It would automatically inherit state
> and life cycle methods from the class whose state it is inspecting.
>
>
>
>
>
>
>
>
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)