Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3466#discussion_r105126769
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java 
---
    @@ -7,109 +7,167 @@
      * "License"); you may not use this file except in compliance
      * with the License.  You may obtain a copy of the License at
      *
    - *     http://www.apache.org/licenses/LICENSE-2.0
    + * http://www.apache.org/licenses/LICENSE-2.0
      *
      * Unless required by applicable law or agreed to in writing, software
      * distributed under the License is distributed on an "AS IS" BASIS,
      * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +
     package org.apache.flink.runtime.state.heap;
     
     import org.apache.flink.annotation.VisibleForTesting;
     import org.apache.flink.api.common.typeutils.TypeSerializer;
     import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
    -import org.apache.flink.runtime.state.KeyGroupRange;
    -
    -import java.util.Map;
    -
    -public class StateTable<K, N, ST> {
    -
    -   /** Map for holding the actual state objects. */
    -   private final Map<N, Map<K, ST>>[] state;
    -
    -   /** The offset to the contiguous key groups */
    -   private final int keyGroupOffset;
    -
    -   /** Combined meta information such as name and serializers for this 
state */
    -   private RegisteredBackendStateMetaInfo<N, ST> metaInfo;
    -
    -   // 
------------------------------------------------------------------------
    -   public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, 
KeyGroupRange keyGroupRange) {
    -           this.metaInfo = metaInfo;
    -           this.keyGroupOffset = keyGroupRange.getStartKeyGroup();
    -
    -           @SuppressWarnings("unchecked")
    -           Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new 
Map[keyGroupRange.getNumberOfKeyGroups()];
    -           this.state = state;
    -   }
    -
    -   // 
------------------------------------------------------------------------
    -   //  access to maps
    -   // 
------------------------------------------------------------------------
    +import org.apache.flink.util.Preconditions;
     
    -   public Map<N, Map<K, ST>>[] getState() {
    -           return state;
    -   }
    -
    -   public Map<N, Map<K, ST>> get(int index) {
    -           final int pos = indexToOffset(index);
    -           if (pos >= 0 && pos < state.length) {
    -                   return state[pos];
    -           } else {
    -                   return null;
    -           }
    -   }
    -
    -   public void set(int index, Map<N, Map<K, ST>> map) {
    -           try {
    -                   state[indexToOffset(index)] = map;
    -           }
    -           catch (ArrayIndexOutOfBoundsException e) {
    -                   throw new IllegalArgumentException("Key group index out 
of range of key group range [" +
    -                                   keyGroupOffset + ", " + (keyGroupOffset 
+ state.length) + ").");
    -           }
    +/**
    + * Base class for state tables. Accesses to state are typically scoped by 
the currently active key, as provided
    + * through the {@link KeyContext}.
    + *
    + * @param <K> type of key
    + * @param <N> type of namespace
    + * @param <S> type of state
    + */
    +public abstract class StateTable<K, N, S> {
    +
    +   /**
    +    * The key context view on the backend. This provides information, such 
as the currently active key.
    +    */
    +   protected final KeyContext<K> keyContext;
    +
    +   /**
    +    * Combined meta information such as name and serializers for this state
    +    */
    +   protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
    +
    +   /**
    +    *
    +    * @param keyContext the key context provides the key scope for all 
put/get/delete operations.
    +    * @param metaInfo the meta information, including the type serializer 
for state copy-on-write.
    +    */
    +   public StateTable(KeyContext<K> keyContext, 
RegisteredBackendStateMetaInfo<N, S> metaInfo) {
    +           this.keyContext = Preconditions.checkNotNull(keyContext);
    +           this.metaInfo = Preconditions.checkNotNull(metaInfo);
        }
     
    -   private int indexToOffset(int index) {
    -           return index - keyGroupOffset;
    +   // Main interface methods of StateTable 
-------------------------------------------------------
    +
    +   /**
    +    * Returns whether this {@link NestedMapsStateTable} is empty.
    +    *
    +    * @return {@code true} if this {@link NestedMapsStateTable} has no 
elements, {@code false}
    +    * otherwise.
    +    * @see #size()
    +    */
    +   public boolean isEmpty() {
    +           return size() == 0;
        }
     
    -   // 
------------------------------------------------------------------------
    -   //  metadata
    -   // 
------------------------------------------------------------------------
    -   
    -   public TypeSerializer<ST> getStateSerializer() {
    +   /**
    +    * Returns the total number of entries in this {@link 
NestedMapsStateTable}. This is the sum of both sub-tables.
    +    *
    +    * @return the number of entries in this {@link NestedMapsStateTable}.
    +    */
    +   public abstract int size();
    +
    +   /**
    +    * Returns the value of the mapping for the composite of active key and 
given namespace.
    +    *
    +    * @param namespace the namespace. Not null.
    +    * @return the value of the mapping with the specified key/namespace 
composite key, or {@code null}
    +    * if no mapping for the specified key is found.
    +    */
    +   public abstract S get(Object namespace);
    +
    +   /**
    +    * Returns whether this table contains a mapping for the composite of 
active key and given namespace.
    +    *
    +    * @param namespace the namespace in the composite key to search for. 
Not null.
    +    * @return {@code true} if this map contains the specified 
key/namespace composite key,
    +    * {@code false} otherwise.
    +    */
    +   public abstract boolean containsKey(Object namespace);
    +
    +   /**
    +    * Maps the composite of active key and given namespace to the 
specified value. This method should be preferred
    +    * over {@link #putAndGetOld(Object, Object)} (Object, Object)} when 
the caller is not interested in the old value.
    +    *
    +    * @param namespace the namespace. Not null.
    +    * @param state     the value. Can be null.
    +    */
    +   public abstract void put(N namespace, S state);
    +
    +   /**
    +    * Maps the composite of active key and given namespace to the 
specified value. Returns the previous state that
    +    * was registered under the composite key.
    +    *
    +    * @param namespace the namespace. Not null.
    +    * @param state     the value. Can be null.
    +    * @return the value of any previous mapping with the specified key or
    +    * {@code null} if there was no such mapping.
    +    */
    +   public abstract S putAndGetOld(N namespace, S state);
    +
    +   /**
    +    * Removes the mapping for the composite of active key and given 
namespace. This method should be preferred
    +    * over {@link #removeAndGetOld(Object)} when the caller is not 
interested in the old value.
    +    *
    +    * @param namespace the namespace of the mapping to remove. Not null.
    +    */
    +   public abstract void remove(Object namespace);
    --- End diff --
    
    Sometimes, the namespace is typed to `N`, sometimes to `Object`. Is that an 
optimization or coincidence?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to