[ 
https://issues.apache.org/jira/browse/FLINK-10343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614932#comment-16614932
 ] 

aitozi edited comment on FLINK-10343 at 9/14/18 2:54 PM:
---------------------------------------------------------

Hi, [~kkl0u]

I think the approach implementing specific operator by userself is also a good 
way. But i think if we can wrap the logic for user will be better. I think we 
can make this an optional cache (may be pluggable) . I will think it over more 
clearly later.

And second question you mentioned "keyed state is written before the function 
state", I think it is a little misunderstood. which can been seen in 
AbstractStreamOperator.java
{code:java}
snapshotState(snapshotContext);  // invoke the stateful function snapshot 
method. 

snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

if (null != operatorStateBackend) {
        snapshotInProgress.setOperatorStateManagedFuture(
                operatorStateBackend.snapshot(checkpointId, timestamp, factory, 
checkpointOptions));
}

if (null != keyedStateBackend) {
        snapshotInProgress.setKeyedStateManagedFuture(
                keyedStateBackend.snapshot(checkpointId, timestamp, factory, 
checkpointOptions));
}
{code}
Hi [~azagrebin]

My purpose is to mix the HeapBackend and RocksdbBackend to deal with the state 
is larger than the memory capacity. AFAIK in our production senior, each coming 
element will try to read the state data from stateBackend and update back when 
using the reducing state and so on. Because the state is a bit large, so we use 
rocksDBBackend, and the most time is spending on the serialization & 
deserialization which may lead to backpressure frequently, so i want to expose 
the ability to cache upon the rocksdbBackend to reduce the  serialization & 
deserialization. And to ensure the exactly once semantic,we have to update data 
back to state with loop the cache entry and set the currentKey.


was (Author: aitozi):
Hi, [~kkl0u]

I think the approach implementing specific operator by userself is also a good 
way. But i think if we can wrap the logic for user will be better. I think we 
can make this an optional cache (may be pluggable) . I will think it over more 
clearly later.

And second question you mentioned "keyed state is written before the function 
state", I think it is a little misunderstood. which can been seen in 
AbstractStreamOperator.java

{code:java}
snapshotState(snapshotContext);  // invoke the stateful function snapshot 
method. 

snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

if (null != operatorStateBackend) {
        snapshotInProgress.setOperatorStateManagedFuture(
                operatorStateBackend.snapshot(checkpointId, timestamp, factory, 
checkpointOptions));
}

if (null != keyedStateBackend) {
        snapshotInProgress.setKeyedStateManagedFuture(
                keyedStateBackend.snapshot(checkpointId, timestamp, factory, 
checkpointOptions));
}
{code}

Hi 


> Expose setCurrentKey method to streamRuntimeContext
> ---------------------------------------------------
>
>                 Key: FLINK-10343
>                 URL: https://issues.apache.org/jira/browse/FLINK-10343
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.7.0
>            Reporter: aitozi
>            Assignee: aitozi
>            Priority: Major
>             Fix For: 1.7.0
>
>
> when we use reducing state / aggregating keyed state and so on , we have to 
> read value from state backend and update the value with userFunction and then 
> put back to state backend. If we can just cache certain data in heap with a 
> map, and update once in snapshot method with 
> {code:java}
> snapshot() {
>  for(Map.Entry<String, String> entry : map.entrySet()){
>      setCurrentKey(entry.getKey());
>      valueState.update(entry.getValue()); // put value back to state backend
> }}
> {code}
> we just have to expose the setCurrentKey to userFunction and the will enable 
> the ability to cache partitial keyedState in memory by userself.
> what's your opinion [[email protected]] [~azagrebin] ? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to