[
https://issues.apache.org/jira/browse/FLINK-10343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614932#comment-16614932
]
aitozi edited comment on FLINK-10343 at 9/14/18 2:54 PM:
---------------------------------------------------------
Hi, [~kkl0u]
I think the approach implementing specific operator by userself is also a good
way. But i think if we can wrap the logic for user will be better. I think we
can make this an optional cache (may be pluggable) . I will think it over more
clearly later.
And second question you mentioned "keyed state is written before the function
state", I think it is a little misunderstood. which can been seen in
AbstractStreamOperator.java
{code:java}
snapshotState(snapshotContext); // invoke the stateful function snapshot
method.
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory,
checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory,
checkpointOptions));
}
{code}
Hi [~azagrebin]
My purpose is to mix the HeapBackend and RocksdbBackend to deal with the state
is larger than the memory capacity. AFAIK in our production senior, each coming
element will try to read the state data from stateBackend and update back when
using the reducing state and so on. Because the state is a bit large, so we use
rocksDBBackend, and the most time is spending on the serialization &
deserialization which may lead to backpressure frequently, so i want to expose
the ability to cache upon the rocksdbBackend to reduce the serialization &
deserialization. And to ensure the exactly once semantic,we have to update data
back to state with loop the cache entry and set the currentKey.
was (Author: aitozi):
Hi, [~kkl0u]
I think the approach implementing specific operator by userself is also a good
way. But i think if we can wrap the logic for user will be better. I think we
can make this an optional cache (may be pluggable) . I will think it over more
clearly later.
And second question you mentioned "keyed state is written before the function
state", I think it is a little misunderstood. which can been seen in
AbstractStreamOperator.java
{code:java}
snapshotState(snapshotContext); // invoke the stateful function snapshot
method.
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory,
checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory,
checkpointOptions));
}
{code}
Hi
> Expose setCurrentKey method to streamRuntimeContext
> ---------------------------------------------------
>
> Key: FLINK-10343
> URL: https://issues.apache.org/jira/browse/FLINK-10343
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 1.7.0
> Reporter: aitozi
> Assignee: aitozi
> Priority: Major
> Fix For: 1.7.0
>
>
> when we use reducing state / aggregating keyed state and so on , we have to
> read value from state backend and update the value with userFunction and then
> put back to state backend. If we can just cache certain data in heap with a
> map, and update once in snapshot method with
> {code:java}
> snapshot() {
> for(Map.Entry<String, String> entry : map.entrySet()){
> setCurrentKey(entry.getKey());
> valueState.update(entry.getValue()); // put value back to state backend
> }}
> {code}
> we just have to expose the setCurrentKey to userFunction and the will enable
> the ability to cache partitial keyedState in memory by userself.
> what's your opinion [[email protected]] [~azagrebin] ?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)