[ 
https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902840#comment-15902840
 ] 

ASF GitHub Bot commented on FLINK-5715:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3466#discussion_r105129267
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
 ---
    @@ -68,62 +63,29 @@ public HeapReducingState(
     
        @Override
        public V get() {
    -           Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
    -           Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
    -
    -           Map<N, Map<K, V>> namespaceMap =
    -                           
stateTable.get(backend.getCurrentKeyGroupIndex());
    -
    -           if (namespaceMap == null) {
    -                   return null;
    -           }
    -
    -           Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
    -
    -           if (keyedMap == null) {
    -                   return null;
    -           }
    -
    -           return keyedMap.get(backend.<K>getCurrentKey());
    +           return stateTable.get(currentNamespace);
        }
     
        @Override
        public void add(V value) throws IOException {
    -           Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
    -           Preconditions.checkState(backend.getCurrentKey() != null, "No 
key set.");
    +           final N namespace = currentNamespace;
     
                if (value == null) {
                        clear();
                        return;
                }
     
    -           Map<N, Map<K, V>> namespaceMap =
    -                           
stateTable.get(backend.getCurrentKeyGroupIndex());
    -
    -           if (namespaceMap == null) {
    -                   namespaceMap = createNewMap();
    -                   stateTable.set(backend.getCurrentKeyGroupIndex(), 
namespaceMap);
    -           }
    -
    -           Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
    -
    -           if (keyedMap == null) {
    -                   keyedMap = createNewMap();
    -                   namespaceMap.put(currentNamespace, keyedMap);
    -           }
    -
    -           V currentValue = keyedMap.put(backend.<K>getCurrentKey(), 
value);
    +           final StateTable<K, N, V> map = stateTable;
    +           final V currentValue = map.putAndGetOld(namespace, value);
    --- End diff --
    
    I have already generalized and implemented the push-down as part of #3483 
(avoiding too much rebasing). Would be nice if you could also take a look at 
that.


> Asynchronous snapshotting for HeapKeyedStateBackend
> ---------------------------------------------------
>
>                 Key: FLINK-5715
>                 URL: https://issues.apache.org/jira/browse/FLINK-5715
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for 
> many user in productions. Their jobs can not tolerate stopped processing for 
> the time it takes to write gigabytes of data from memory to disk. 
> Asynchronous snapshots would be a solution to this problem. The challenge for 
> the implementation is coming up with a copy-on-write scheme for the in-memory 
> hash maps that build the foundation of this backend. After taking a closer 
> look, this problem is twofold. First, providing CoW semantics for the hashmap 
> itself, as a mutible structure, thereby avoiding costly locking or blocking 
> where possible. Second, CoW for the mutable value objects, e.g. through 
> cloning via serializers.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to