[ 
https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902820#comment-15902820
 ] 

ASF GitHub Bot commented on FLINK-5715:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3466#discussion_r105126769
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java 
---
    @@ -7,109 +7,167 @@
      * "License"); you may not use this file except in compliance
      * with the License.  You may obtain a copy of the License at
      *
    - *     http://www.apache.org/licenses/LICENSE-2.0
    + * http://www.apache.org/licenses/LICENSE-2.0
      *
      * Unless required by applicable law or agreed to in writing, software
      * distributed under the License is distributed on an "AS IS" BASIS,
      * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +
     package org.apache.flink.runtime.state.heap;
     
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.typeutils.TypeSerializer;
     import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
    -import org.apache.flink.runtime.state.KeyGroupRange;
    -
    -import java.util.Map;
    -
    -public class StateTable<K, N, ST> {
    -
    -   /** Map for holding the actual state objects. */
    -   private final Map<N, Map<K, ST>>[] state;
    -
    -   /** The offset to the contiguous key groups */
    -   private final int keyGroupOffset;
    -
    -   /** Combined meta information such as name and serializers for this 
state */
    -   private RegisteredBackendStateMetaInfo<N, ST> metaInfo;
    -
    -   // 
------------------------------------------------------------------------
    -   public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, 
KeyGroupRange keyGroupRange) {
    -           this.metaInfo = metaInfo;
    -           this.keyGroupOffset = keyGroupRange.getStartKeyGroup();
    -
    -           @SuppressWarnings("unchecked")
    -           Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new 
Map[keyGroupRange.getNumberOfKeyGroups()];
    -           this.state = state;
    -   }
    -
    -   // 
------------------------------------------------------------------------
    -   //  access to maps
    -   // 
------------------------------------------------------------------------
    +import org.apache.flink.util.Preconditions;
     
    -   public Map<N, Map<K, ST>>[] getState() {
    -           return state;
    -   }
    -
    -   public Map<N, Map<K, ST>> get(int index) {
    -           final int pos = indexToOffset(index);
    -           if (pos >= 0 && pos < state.length) {
    -                   return state[pos];
    -           } else {
    -                   return null;
    -           }
    -   }
    -
    -   public void set(int index, Map<N, Map<K, ST>> map) {
    -           try {
    -                   state[indexToOffset(index)] = map;
    -           }
    -           catch (ArrayIndexOutOfBoundsException e) {
    -                   throw new IllegalArgumentException("Key group index out 
of range of key group range [" +
    -                                   keyGroupOffset + ", " + (keyGroupOffset 
+ state.length) + ").");
    -           }
    +/**
    + * Base class for state tables. Accesses to state are typically scoped by 
the currently active key, as provided
    + * through the {@link KeyContext}.
    + *
    + * @param <K> type of key
    + * @param <N> type of namespace
    + * @param <S> type of state
    + */
    +public abstract class StateTable<K, N, S> {
    +
    +   /**
    +    * The key context view on the backend. This provides information, such 
as the currently active key.
    +    */
    +   protected final KeyContext<K> keyContext;
    +
    +   /**
    +    * Combined meta information such as name and serializers for this state
    +    */
    +   protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
    +
    +   /**
    +    *
    +    * @param keyContext the key context provides the key scope for all 
put/get/delete operations.
    +    * @param metaInfo the meta information, including the type serializer 
for state copy-on-write.
    +    */
    +   public StateTable(KeyContext<K> keyContext, 
RegisteredBackendStateMetaInfo<N, S> metaInfo) {
    +           this.keyContext = Preconditions.checkNotNull(keyContext);
    +           this.metaInfo = Preconditions.checkNotNull(metaInfo);
        }
     
    -   private int indexToOffset(int index) {
    -           return index - keyGroupOffset;
    +   // Main interface methods of StateTable 
-------------------------------------------------------
    +
    +   /**
    +    * Returns whether this {@link NestedMapsStateTable} is empty.
    +    *
    +    * @return {@code true} if this {@link NestedMapsStateTable} has no 
elements, {@code false}
    +    * otherwise.
    +    * @see #size()
    +    */
    +   public boolean isEmpty() {
    +           return size() == 0;
        }
     
    -   // 
------------------------------------------------------------------------
    -   //  metadata
    -   // 
------------------------------------------------------------------------
    -   
    -   public TypeSerializer<ST> getStateSerializer() {
    +   /**
    +    * Returns the total number of entries in this {@link 
NestedMapsStateTable}. This is the sum of both sub-tables.
    +    *
    +    * @return the number of entries in this {@link NestedMapsStateTable}.
    +    */
    +   public abstract int size();
    +
    +   /**
    +    * Returns the value of the mapping for the composite of active key and 
given namespace.
    +    *
    +    * @param namespace the namespace. Not null.
    +    * @return the value of the mapping with the specified key/namespace 
composite key, or {@code null}
    +    * if no mapping for the specified key is found.
    +    */
    +   public abstract S get(Object namespace);
    +
    +   /**
    +    * Returns whether this table contains a mapping for the composite of 
active key and given namespace.
    +    *
    +    * @param namespace the namespace in the composite key to search for. 
Not null.
    +    * @return {@code true} if this map contains the specified 
key/namespace composite key,
    +    * {@code false} otherwise.
    +    */
    +   public abstract boolean containsKey(Object namespace);
    +
    +   /**
    +    * Maps the composite of active key and given namespace to the 
specified value. This method should be preferred
    +    * over {@link #putAndGetOld(Object, Object)} (Object, Object)} when 
the caller is not interested in the old value.
    +    *
    +    * @param namespace the namespace. Not null.
    +    * @param state     the value. Can be null.
    +    */
    +   public abstract void put(N namespace, S state);
    +
    +   /**
    +    * Maps the composite of active key and given namespace to the 
specified value. Returns the previous state that
    +    * was registered under the composite key.
    +    *
    +    * @param namespace the namespace. Not null.
    +    * @param state     the value. Can be null.
    +    * @return the value of any previous mapping with the specified key or
    +    * {@code null} if there was no such mapping.
    +    */
    +   public abstract S putAndGetOld(N namespace, S state);
    +
    +   /**
    +    * Removes the mapping for the composite of active key and given 
namespace. This method should be preferred
    +    * over {@link #removeAndGetOld(Object)} when the caller is not 
interested in the old value.
    +    *
    +    * @param namespace the namespace of the mapping to remove. Not null.
    +    */
    +   public abstract void remove(Object namespace);
    --- End diff --
    
    Sometimes, the namespace is typed to `N`, sometimes to `Object`. Is that an 
optimization or coincidence?


> Asynchronous snapshotting for HeapKeyedStateBackend
> ---------------------------------------------------
>
>                 Key: FLINK-5715
>                 URL: https://issues.apache.org/jira/browse/FLINK-5715
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for 
> many user in productions. Their jobs can not tolerate stopped processing for 
> the time it takes to write gigabytes of data from memory to disk. 
> Asynchronous snapshots would be a solution to this problem. The challenge for 
> the implementation is coming up with a copy-on-write scheme for the in-memory 
> hash maps that build the foundation of this backend. After taking a closer 
> look, this problem is twofold. First, providing CoW semantics for the hashmap 
> itself, as a mutible structure, thereby avoiding costly locking or blocking 
> where possible. Second, CoW for the mutable value objects, e.g. through 
> cloning via serializers.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to