[
https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902820#comment-15902820
]
ASF GitHub Bot commented on FLINK-5715:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/3466#discussion_r105126769
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
---
@@ -7,109 +7,167 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
-import org.apache.flink.runtime.state.KeyGroupRange;
-
-import java.util.Map;
-
-public class StateTable<K, N, ST> {
-
- /** Map for holding the actual state objects. */
- private final Map<N, Map<K, ST>>[] state;
-
- /** The offset to the contiguous key groups */
- private final int keyGroupOffset;
-
- /** Combined meta information such as name and serializers for this
state */
- private RegisteredBackendStateMetaInfo<N, ST> metaInfo;
-
- //
------------------------------------------------------------------------
- public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo,
KeyGroupRange keyGroupRange) {
- this.metaInfo = metaInfo;
- this.keyGroupOffset = keyGroupRange.getStartKeyGroup();
-
- @SuppressWarnings("unchecked")
- Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new
Map[keyGroupRange.getNumberOfKeyGroups()];
- this.state = state;
- }
-
- //
------------------------------------------------------------------------
- // access to maps
- //
------------------------------------------------------------------------
+import org.apache.flink.util.Preconditions;
- public Map<N, Map<K, ST>>[] getState() {
- return state;
- }
-
- public Map<N, Map<K, ST>> get(int index) {
- final int pos = indexToOffset(index);
- if (pos >= 0 && pos < state.length) {
- return state[pos];
- } else {
- return null;
- }
- }
-
- public void set(int index, Map<N, Map<K, ST>> map) {
- try {
- state[indexToOffset(index)] = map;
- }
- catch (ArrayIndexOutOfBoundsException e) {
- throw new IllegalArgumentException("Key group index out
of range of key group range [" +
- keyGroupOffset + ", " + (keyGroupOffset
+ state.length) + ").");
- }
+/**
+ * Base class for state tables. Accesses to state are typically scoped by
the currently active key, as provided
+ * through the {@link KeyContext}.
+ *
+ * @param <K> type of key
+ * @param <N> type of namespace
+ * @param <S> type of state
+ */
+public abstract class StateTable<K, N, S> {
+
+ /**
+ * The key context view on the backend. This provides information, such
as the currently active key.
+ */
+ protected final KeyContext<K> keyContext;
+
+ /**
+ * Combined meta information such as name and serializers for this state
+ */
+ protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
+
+ /**
+ *
+ * @param keyContext the key context provides the key scope for all
put/get/delete operations.
+ * @param metaInfo the meta information, including the type serializer
for state copy-on-write.
+ */
+ public StateTable(KeyContext<K> keyContext,
RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ this.keyContext = Preconditions.checkNotNull(keyContext);
+ this.metaInfo = Preconditions.checkNotNull(metaInfo);
}
- private int indexToOffset(int index) {
- return index - keyGroupOffset;
+ // Main interface methods of StateTable
-------------------------------------------------------
+
+ /**
+ * Returns whether this {@link NestedMapsStateTable} is empty.
+ *
+ * @return {@code true} if this {@link NestedMapsStateTable} has no
elements, {@code false}
+ * otherwise.
+ * @see #size()
+ */
+ public boolean isEmpty() {
+ return size() == 0;
}
- //
------------------------------------------------------------------------
- // metadata
- //
------------------------------------------------------------------------
-
- public TypeSerializer<ST> getStateSerializer() {
+ /**
+ * Returns the total number of entries in this {@link
NestedMapsStateTable}. This is the sum of both sub-tables.
+ *
+ * @return the number of entries in this {@link NestedMapsStateTable}.
+ */
+ public abstract int size();
+
+ /**
+ * Returns the value of the mapping for the composite of active key and
given namespace.
+ *
+ * @param namespace the namespace. Not null.
+ * @return the value of the mapping with the specified key/namespace
composite key, or {@code null}
+ * if no mapping for the specified key is found.
+ */
+ public abstract S get(Object namespace);
+
+ /**
+ * Returns whether this table contains a mapping for the composite of
active key and given namespace.
+ *
+ * @param namespace the namespace in the composite key to search for.
Not null.
+ * @return {@code true} if this map contains the specified
key/namespace composite key,
+ * {@code false} otherwise.
+ */
+ public abstract boolean containsKey(Object namespace);
+
+ /**
+ * Maps the composite of active key and given namespace to the
specified value. This method should be preferred
+ * over {@link #putAndGetOld(Object, Object)} (Object, Object)} when
the caller is not interested in the old value.
+ *
+ * @param namespace the namespace. Not null.
+ * @param state the value. Can be null.
+ */
+ public abstract void put(N namespace, S state);
+
+ /**
+ * Maps the composite of active key and given namespace to the
specified value. Returns the previous state that
+ * was registered under the composite key.
+ *
+ * @param namespace the namespace. Not null.
+ * @param state the value. Can be null.
+ * @return the value of any previous mapping with the specified key or
+ * {@code null} if there was no such mapping.
+ */
+ public abstract S putAndGetOld(N namespace, S state);
+
+ /**
+ * Removes the mapping for the composite of active key and given
namespace. This method should be preferred
+ * over {@link #removeAndGetOld(Object)} when the caller is not
interested in the old value.
+ *
+ * @param namespace the namespace of the mapping to remove. Not null.
+ */
+ public abstract void remove(Object namespace);
--- End diff --
Sometimes, the namespace is typed to `N`, sometimes to `Object`. Is that an
optimization or coincidence?
> Asynchronous snapshotting for HeapKeyedStateBackend
> ---------------------------------------------------
>
> Key: FLINK-5715
> URL: https://issues.apache.org/jira/browse/FLINK-5715
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for
> many user in productions. Their jobs can not tolerate stopped processing for
> the time it takes to write gigabytes of data from memory to disk.
> Asynchronous snapshots would be a solution to this problem. The challenge for
> the implementation is coming up with a copy-on-write scheme for the in-memory
> hash maps that build the foundation of this backend. After taking a closer
> look, this problem is twofold. First, providing CoW semantics for the hashmap
> itself, as a mutible structure, thereby avoiding costly locking or blocking
> where possible. Second, CoW for the mutable value objects, e.g. through
> cloning via serializers.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)