http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTable.java new file mode 100644 index 0000000..6c9c14c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTable.java @@ -0,0 +1,1066 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.TreeSet; + +/** + * Implementation of Flink's in-memory state tables with copy-on-write support. This map does not support null values + * for key or namespace. + * <p> + * {@link CopyOnWriteStateTable} sacrifices some peak performance and memory efficiency for features like incremental + * rehashing and asynchronous snapshots through copy-on-write. Copy-on-write tries to minimize the amount of copying by + * maintaining version meta data for both, the map structure and the state objects. However, we must often proactively + * copy state objects when we hand them to the user. + * <p> + * As for any state backend, user should not keep references on state objects that they obtained from state backends + * outside the scope of the user function calls. + * <p> + * Some brief maintenance notes: + * <p> + * 1) Flattening the underlying data structure from nested maps (namespace) -> (key) -> (state) to one flat map + * (key, namespace) -> (state) brings certain performance trade-offs. In theory, the flat map has one less level of + * indirection compared to the nested map. However, the nested map naturally de-duplicates namespace objects for which + * #equals() is true. This leads to potentially a lot of redundant namespace objects for the flattened version. Those, + * in turn, can again introduce more cache misses because we need to follow the namespace object on all operations to + * ensure entry identities. Obviously, copy-on-write can also add memory overhead. So does the meta data to track + * copy-on-write requirement (state and entry versions on {@link StateTableEntry}). + * <p> + * 2) A flat map structure is a lot easier when it comes to tracking copy-on-write of the map structure. + * <p> + * 3) Nested structure had the (never used) advantage that we can easily drop and iterate whole namespaces. This could + * give locality advantages for certain access pattern, e.g. iterating a namespace. + * <p> + * 4) Serialization format is changed from namespace-prefix compressed (as naturally provided from the old nested + * structure) to making all entries self contained as (key, namespace, state). + * <p> + * 5) We got rid of having multiple nested tables, one for each key-group. Instead, we partition state into key-groups + * on-the-fly, during the asynchronous part of a snapshot. + * <p> + * 6) Currently, a state table can only grow, but never shrinks on low load. We could easily add this if required. + * <p> + * 7) Heap based state backends like this can easily cause a lot of GC activity. Besides using G1 as garbage collector, + * we should provide an additional state backend that operates on off-heap memory. This would sacrifice peak performance + * (due to de/serialization of objects) for a lower, but more constant throughput and potentially huge simplifications + * w.r.t. copy-on-write. + * <p> + * 8) We could try a hybrid of a serialized and object based backends, where key and namespace of the entries are both + * serialized in one byte-array. + * <p> + * 9) We could consider smaller types (e.g. short) for the version counting and think about some reset strategy before + * overflows, when there is no snapshot running. However, this would have to touch all entries in the map. + * <p> + * This class was initially based on the {@link java.util.HashMap} implementation of the Android JDK, but is now heavily + * customized towards the use case of table for state entries. + * + * IMPORTANT: the contracts for this class rely on the user not holding any references to objects returned by this map + * beyond the life cycle of per-element operations. Or phrased differently, all get-update-put operations on a mapping + * should be within one call of processElement. Otherwise, the user must take care of taking deep copies, e.g. for + * caching purposes. + * + * @param <K> type of key. + * @param <N> type of namespace. + * @param <S> type of value. + */ +public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implements Iterable<StateEntry<K, N, S>> { + + /** + * The logger. + */ + private static final Logger LOG = LoggerFactory.getLogger(AsyncHeapKeyedStateBackend.class); + + /** + * Min capacity (other than zero) for a {@link CopyOnWriteStateTable}. Must be a power of two + * greater than 1 (and less than 1 << 30). + */ + private static final int MINIMUM_CAPACITY = 4; + + /** + * Max capacity for a {@link CopyOnWriteStateTable}. Must be a power of two >= MINIMUM_CAPACITY. + */ + private static final int MAXIMUM_CAPACITY = 1 << 30; + + /** + * Minimum number of entries that one step of incremental rehashing migrates from the old to the new sub-table. + */ + private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4; + + /** + * An empty table shared by all zero-capacity maps (typically from default + * constructor). It is never written to, and replaced on first put. Its size + * is set to half the minimum, so that the first resize will create a + * minimum-sized table. + */ + private static final StateTableEntry<?, ?, ?>[] EMPTY_TABLE = new StateTableEntry[MINIMUM_CAPACITY >>> 1]; + + /** + * Empty entry that we use to bootstrap our StateEntryIterator. + */ + private static final StateTableEntry<?, ?, ?> ITERATOR_BOOTSTRAP_ENTRY = new StateTableEntry<>(); + + /** + * Maintains an ordered set of version ids that are still in use by unreleased snapshots. + */ + private final TreeSet<Integer> snapshotVersions; + + /** + * This is the primary entry array (hash directory) of the state table. If no incremental rehash is ongoing, this + * is the only used table. + **/ + private StateTableEntry<K, N, S>[] primaryTable; + + /** + * We maintain a secondary entry array while performing an incremental rehash. The purpose is to slowly migrate + * entries from the primary table to this resized table array. When all entries are migrated, this becomes the new + * primary table. + */ + private StateTableEntry<K, N, S>[] incrementalRehashTable; + + /** + * The current number of mappings in the primary table. + */ + private int primaryTableSize; + + /** + * The current number of mappings in the rehash table. + */ + private int incrementalRehashTableSize; + + /** + * The next index for a step of incremental rehashing in the primary table. + */ + private int rehashIndex; + + /** + * The current version of this map. Used for copy-on-write mechanics. + */ + private int stateTableVersion; + + /** + * The highest version of this map that is still required by any unreleased snapshot. + */ + private int highestRequiredSnapshotVersion; + + /** + * The last namespace that was actually inserted. This is a small optimization to reduce duplicate namespace objects. + */ + private N lastNamespace; + + /** + * The {@link CopyOnWriteStateTable} is rehashed when its size exceeds this threshold. + * The value of this field is generally .75 * capacity, except when + * the capacity is zero, as described in the EMPTY_TABLE declaration + * above. + */ + private int threshold; + + /** + * Incremented by "structural modifications" to allow (best effort) + * detection of concurrent modification. + */ + private int modCount; + + /** + * Constructs a new {@code StateTable} with default capacity of 1024. + * + * @param keyContext the key context. + * @param metaInfo the meta information, including the type serializer for state copy-on-write. + */ + CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) { + this(keyContext, metaInfo, 1024); + } + + /** + * Constructs a new {@code StateTable} instance with the specified capacity. + * + * @param keyContext the key context. + * @param metaInfo the meta information, including the type serializer for state copy-on-write. + * @param capacity the initial capacity of this hash map. + * @throws IllegalArgumentException when the capacity is less than zero. + */ + @SuppressWarnings("unchecked") + private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo, int capacity) { + super(keyContext, metaInfo); + + // initialized tables to EMPTY_TABLE. + this.primaryTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE; + this.incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE; + + // initialize sizes to 0. + this.primaryTableSize = 0; + this.incrementalRehashTableSize = 0; + + this.rehashIndex = 0; + this.stateTableVersion = 0; + this.highestRequiredSnapshotVersion = 0; + this.snapshotVersions = new TreeSet<>(); + + if (capacity < 0) { + throw new IllegalArgumentException("Capacity: " + capacity); + } + + if (capacity == 0) { + threshold = -1; + return; + } + + if (capacity < MINIMUM_CAPACITY) { + capacity = MINIMUM_CAPACITY; + } else if (capacity > MAXIMUM_CAPACITY) { + capacity = MAXIMUM_CAPACITY; + } else { + capacity = MathUtils.roundUpToPowerOfTwo(capacity); + } + primaryTable = makeTable(capacity); + } + + // Public API from AbstractStateTable ------------------------------------------------------------------------------ + + /** + * Returns the total number of entries in this {@link CopyOnWriteStateTable}. This is the sum of both sub-tables. + * + * @return the number of entries in this {@link CopyOnWriteStateTable}. + */ + @Override + public int size() { + return primaryTableSize + incrementalRehashTableSize; + } + + @Override + public S get(K key, N namespace) { + + final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); + final int requiredVersion = highestRequiredSnapshotVersion; + final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash); + int index = hash & (tab.length - 1); + + for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) { + final K eKey = e.key; + final N eNamespace = e.namespace; + if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) { + + // copy-on-write check for state + if (e.stateVersion < requiredVersion) { + // copy-on-write check for entry + if (e.entryVersion < requiredVersion) { + e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e); + } + e.stateVersion = stateTableVersion; + e.state = getStateSerializer().copy(e.state); + } + + return e.state; + } + } + + return null; + } + + @Override + public void put(K key, int keyGroup, N namespace, S state) { + put(key, namespace, state); + } + + @Override + public S get(N namespace) { + return get(keyContext.getCurrentKey(), namespace); + } + + @Override + public boolean containsKey(N namespace) { + return containsKey(keyContext.getCurrentKey(), namespace); + } + + @Override + public void put(N namespace, S state) { + put(keyContext.getCurrentKey(), namespace, state); + } + + @Override + public S putAndGetOld(N namespace, S state) { + return putAndGetOld(keyContext.getCurrentKey(), namespace, state); + } + + @Override + public void remove(N namespace) { + remove(keyContext.getCurrentKey(), namespace); + } + + @Override + public S removeAndGetOld(N namespace) { + return removeAndGetOld(keyContext.getCurrentKey(), namespace); + } + + @Override + public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception { + transform(keyContext.getCurrentKey(), namespace, value, transformation); + } + + // Private implementation details of the API methods --------------------------------------------------------------- + + /** + * Returns whether this table contains the specified key/namespace composite key. + * + * @param key the key in the composite key to search for. Not null. + * @param namespace the namespace in the composite key to search for. Not null. + * @return {@code true} if this map contains the specified key/namespace composite key, + * {@code false} otherwise. + */ + boolean containsKey(K key, N namespace) { + + final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); + final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash); + int index = hash & (tab.length - 1); + + for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) { + final K eKey = e.key; + final N eNamespace = e.namespace; + + if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) { + return true; + } + } + return false; + } + + /** + * Maps the specified key/namespace composite key to the specified value. This method should be preferred + * over {@link #putAndGetOld(Object, Object, Object)} (Object, Object)} when the caller is not interested + * in the old value, because this can potentially reduce copy-on-write activity. + * + * @param key the key. Not null. + * @param namespace the namespace. Not null. + * @param value the value. Can be null. + */ + void put(K key, N namespace, S value) { + final StateTableEntry<K, N, S> e = putEntry(key, namespace); + + e.state = value; + e.stateVersion = stateTableVersion; + } + + /** + * Maps the specified key/namespace composite key to the specified value. Returns the previous state that was + * registered under the composite key. + * + * @param key the key. Not null. + * @param namespace the namespace. Not null. + * @param value the value. Can be null. + * @return the value of any previous mapping with the specified key or + * {@code null} if there was no such mapping. + */ + S putAndGetOld(K key, N namespace, S value) { + + final StateTableEntry<K, N, S> e = putEntry(key, namespace); + + // copy-on-write check for state + S oldState = (e.stateVersion < highestRequiredSnapshotVersion) ? + getStateSerializer().copy(e.state) : + e.state; + + e.state = value; + e.stateVersion = stateTableVersion; + + return oldState; + } + + /** + * Removes the mapping with the specified key/namespace composite key from this map. This method should be preferred + * over {@link #removeAndGetOld(Object, Object)} when the caller is not interested in the old value, because this + * can potentially reduce copy-on-write activity. + * + * @param key the key of the mapping to remove. Not null. + * @param namespace the namespace of the mapping to remove. Not null. + */ + void remove(K key, N namespace) { + removeEntry(key, namespace); + } + + /** + * Removes the mapping with the specified key/namespace composite key from this map, returning the state that was + * found under the entry. + * + * @param key the key of the mapping to remove. Not null. + * @param namespace the namespace of the mapping to remove. Not null. + * @return the value of the removed mapping or {@code null} if no mapping + * for the specified key was found. + */ + S removeAndGetOld(K key, N namespace) { + + final StateTableEntry<K, N, S> e = removeEntry(key, namespace); + + return e != null ? + // copy-on-write check for state + (e.stateVersion < highestRequiredSnapshotVersion ? + getStateSerializer().copy(e.state) : + e.state) : + null; + } + + /** + * @param key the key of the mapping to remove. Not null. + * @param namespace the namespace of the mapping to remove. Not null. + * @param value the value that is the second input for the transformation. + * @param transformation the transformation function to apply on the old state and the given value. + * @param <T> type of the value that is the second input to the {@link StateTransformationFunction}. + * @throws Exception exception that happen on applying the function. + * @see #transform(Object, Object, StateTransformationFunction). + */ + <T> void transform( + K key, + N namespace, + T value, + StateTransformationFunction<S, T> transformation) throws Exception { + + final StateTableEntry<K, N, S> entry = putEntry(key, namespace); + + // copy-on-write check for state + entry.state = transformation.apply( + (entry.stateVersion < highestRequiredSnapshotVersion) ? + getStateSerializer().copy(entry.state) : + entry.state, + value); + entry.stateVersion = stateTableVersion; + } + + /** + * Helper method that is the basis for operations that add mappings. + */ + private StateTableEntry<K, N, S> putEntry(K key, N namespace) { + + final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); + final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash); + int index = hash & (tab.length - 1); + + for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) { + if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) { + + // copy-on-write check for entry + if (e.entryVersion < highestRequiredSnapshotVersion) { + e = handleChainedEntryCopyOnWrite(tab, index, e); + } + + return e; + } + } + + ++modCount; + if (size() > threshold) { + doubleCapacity(); + } + + return addNewStateTableEntry(tab, key, namespace, hash); + } + + /** + * Helper method that is the basis for operations that remove mappings. + */ + private StateTableEntry<K, N, S> removeEntry(K key, N namespace) { + + final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); + final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash); + int index = hash & (tab.length - 1); + + for (StateTableEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) { + if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) { + if (prev == null) { + tab[index] = e.next; + } else { + // copy-on-write check for entry + if (prev.entryVersion < highestRequiredSnapshotVersion) { + prev = handleChainedEntryCopyOnWrite(tab, index, prev); + } + prev.next = e.next; + } + ++modCount; + if (tab == primaryTable) { + --primaryTableSize; + } else { + --incrementalRehashTableSize; + } + return e; + } + } + return null; + } + + private void checkKeyNamespacePreconditions(K key, N namespace) { + Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context."); + Preconditions.checkNotNull(namespace, "Provided namespace is null."); + } + + // Meta data setter / getter and toString -------------------------------------------------------------------------- + + @Override + public TypeSerializer<S> getStateSerializer() { + return metaInfo.getStateSerializer(); + } + + @Override + public TypeSerializer<N> getNamespaceSerializer() { + return metaInfo.getNamespaceSerializer(); + } + + @Override + public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() { + return metaInfo; + } + + @Override + public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) { + this.metaInfo = metaInfo; + } + + // Iteration ------------------------------------------------------------------------------------------------------ + + @Override + public Iterator<StateEntry<K, N, S>> iterator() { + return new StateEntryIterator(); + } + + // Private utility functions for StateTable management ------------------------------------------------------------- + + /** + * @see #releaseSnapshot(CopyOnWriteStateTableSnapshot) + */ + @VisibleForTesting + void releaseSnapshot(int snapshotVersion) { + // we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release. + // Only stale reads of from the result of #releaseSnapshot calls are ok. + synchronized (snapshotVersions) { + Preconditions.checkState(snapshotVersions.remove(snapshotVersion), "Attempt to release unknown snapshot version"); + highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ? 0 : snapshotVersions.last(); + } + } + + /** + * Creates (combined) copy of the table arrays for a snapshot. This method must be called by the same Thread that + * does modifications to the {@link CopyOnWriteStateTable}. + */ + @VisibleForTesting + @SuppressWarnings("unchecked") + StateTableEntry<K, N, S>[] snapshotTableArrays() { + + // we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release. + // Only stale reads of from the result of #releaseSnapshot calls are ok. This is why we must call this method + // from the same thread that does all the modifications to the table. + synchronized (snapshotVersions) { + + // increase the table version for copy-on-write and register the snapshot + if (++stateTableVersion < 0) { + // this is just a safety net against overflows, but should never happen in practice (i.e., only after 2^31 snapshots) + throw new IllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart."); + } + + highestRequiredSnapshotVersion = stateTableVersion; + snapshotVersions.add(highestRequiredSnapshotVersion); + } + + StateTableEntry<K, N, S>[] table = primaryTable; + if (isRehashing()) { + // consider both tables for the snapshot, the rehash index tells us which part of the two tables we need + final int localRehashIndex = rehashIndex; + final int localCopyLength = table.length - localRehashIndex; + StateTableEntry<K, N, S>[] copy = new StateTableEntry[localRehashIndex + table.length]; + // for the primary table, take every index >= rhIdx. + System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength); + + // for the new table, we are sure that two regions contain all the entries: + // [0, rhIdx[ AND [table.length / 2, table.length / 2 + rhIdx[ + table = incrementalRehashTable; + System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex); + System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex); + + return copy; + } else { + // we only need to copy the primary table + return Arrays.copyOf(table, table.length); + } + } + + /** + * Allocate a table of the given capacity and set the threshold accordingly. + * + * @param newCapacity must be a power of two + */ + private StateTableEntry<K, N, S>[] makeTable(int newCapacity) { + + if (MAXIMUM_CAPACITY == newCapacity) { + LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can lead " + + "to more collisions and lower performance. Please consider scaling-out your job or using a " + + "different keyed state backend implementation!"); + } + + threshold = (newCapacity >> 1) + (newCapacity >> 2); // 3/4 capacity + @SuppressWarnings("unchecked") StateTableEntry<K, N, S>[] newTable + = (StateTableEntry<K, N, S>[]) new StateTableEntry[newCapacity]; + return newTable; + } + + /** + * Creates and inserts a new {@link StateTableEntry}. + */ + private StateTableEntry<K, N, S> addNewStateTableEntry( + StateTableEntry<K, N, S>[] table, + K key, + N namespace, + int hash) { + + // small optimization that aims to avoid holding references on duplicate namespace objects + if (namespace.equals(lastNamespace)) { + namespace = lastNamespace; + } else { + lastNamespace = namespace; + } + + int index = hash & (table.length - 1); + StateTableEntry<K, N, S> newEntry = new StateTableEntry<>( + key, + namespace, + null, + hash, + table[index], + stateTableVersion, + stateTableVersion); + table[index] = newEntry; + + if (table == primaryTable) { + ++primaryTableSize; + } else { + ++incrementalRehashTableSize; + } + return newEntry; + } + + /** + * Select the sub-table which is responsible for entries with the given hash code. + * + * @param hashCode the hash code which we use to decide about the table that is responsible. + * @return the index of the sub-table that is responsible for the entry with the given hash code. + */ + private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) { + return (hashCode & (primaryTable.length - 1)) >= rehashIndex ? primaryTable : incrementalRehashTable; + } + + /** + * Doubles the capacity of the hash table. Existing entries are placed in + * the correct bucket on the enlarged table. If the current capacity is, + * MAXIMUM_CAPACITY, this method is a no-op. Returns the table, which + * will be new unless we were already at MAXIMUM_CAPACITY. + */ + private void doubleCapacity() { + + // There can only be one rehash in flight. From the amount of incremental rehash steps we take, this should always hold. + Preconditions.checkState(!isRehashing(), "There is already a rehash in progress."); + + StateTableEntry<K, N, S>[] oldTable = primaryTable; + + int oldCapacity = oldTable.length; + + if (oldCapacity == MAXIMUM_CAPACITY) { + return; + } + + incrementalRehashTable = makeTable(oldCapacity * 2); + } + + /** + * Returns true, if an incremental rehash is in progress. + */ + @VisibleForTesting + boolean isRehashing() { + // if we rehash, the secondary table is not empty + return EMPTY_TABLE != incrementalRehashTable; + } + + /** + * Computes the hash for the composite of key and namespace and performs some steps of incremental rehash if + * incremental rehashing is in progress. + */ + private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) { + + checkKeyNamespacePreconditions(key, namespace); + + if (isRehashing()) { + incrementalRehash(); + } + + return compositeHash(key, namespace); + } + + /** + * Runs a number of steps for incremental rehashing. + */ + @SuppressWarnings("unchecked") + private void incrementalRehash() { + + StateTableEntry<K, N, S>[] oldTable = primaryTable; + StateTableEntry<K, N, S>[] newTable = incrementalRehashTable; + + int oldCapacity = oldTable.length; + int newMask = newTable.length - 1; + int requiredVersion = highestRequiredSnapshotVersion; + int rhIdx = rehashIndex; + int transferred = 0; + + // we migrate a certain minimum amount of entries from the old to the new table + while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) { + + StateTableEntry<K, N, S> e = oldTable[rhIdx]; + + while (e != null) { + // copy-on-write check for entry + if (e.entryVersion < requiredVersion) { + e = new StateTableEntry<>(e, stateTableVersion); + } + StateTableEntry<K, N, S> n = e.next; + int pos = e.hash & newMask; + e.next = newTable[pos]; + newTable[pos] = e; + e = n; + ++transferred; + } + + oldTable[rhIdx] = null; + if (++rhIdx == oldCapacity) { + //here, the rehash is complete and we release resources and reset fields + primaryTable = newTable; + incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE; + primaryTableSize += incrementalRehashTableSize; + incrementalRehashTableSize = 0; + rehashIndex = 0; + return; + } + } + + // sync our local bookkeeping the with official bookkeeping fields + primaryTableSize -= transferred; + incrementalRehashTableSize += transferred; + rehashIndex = rhIdx; + } + + /** + * Perform copy-on-write for entry chains. We iterate the (hopefully and probably) still cached chain, replace + * all links up to the 'untilEntry', which we actually wanted to modify. + */ + private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite( + StateTableEntry<K, N, S>[] tab, + int tableIdx, + StateTableEntry<K, N, S> untilEntry) { + + final int required = highestRequiredSnapshotVersion; + + StateTableEntry<K, N, S> current = tab[tableIdx]; + StateTableEntry<K, N, S> copy; + + if (current.entryVersion < required) { + copy = new StateTableEntry<>(current, stateTableVersion); + tab[tableIdx] = copy; + } else { + // nothing to do, just advance copy to current + copy = current; + } + + // we iterate the chain up to 'until entry' + while (current != untilEntry) { + + //advance current + current = current.next; + + if (current.entryVersion < required) { + // copy and advance the current's copy + copy.next = new StateTableEntry<>(current, stateTableVersion); + copy = copy.next; + } else { + // nothing to do, just advance copy to current + copy = current; + } + } + + return copy; + } + + @SuppressWarnings("unchecked") + private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() { + return (StateTableEntry<K, N, S>) ITERATOR_BOOTSTRAP_ENTRY; + } + + /** + * Helper function that creates and scrambles a composite hash for key and namespace. + */ + private static int compositeHash(Object key, Object namespace) { + // create composite key through XOR, then apply some bit-mixing for better distribution of skewed keys. + return MathUtils.bitMix(key.hashCode() ^ namespace.hashCode()); + } + + // Snapshotting ---------------------------------------------------------------------------------------------------- + + int getStateTableVersion() { + return stateTableVersion; + } + + /** + * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be written in checkpointing. The snapshot integrity + * is protected through copy-on-write from the {@link CopyOnWriteStateTable}. Users should call + * {@link #releaseSnapshot(CopyOnWriteStateTableSnapshot)} after using the returned object. + * + * @return a snapshot from this {@link CopyOnWriteStateTable}, for checkpointing. + */ + @Override + public CopyOnWriteStateTableSnapshot<K, N, S> createSnapshot() { + return new CopyOnWriteStateTableSnapshot<>(this); + } + + /** + * Releases a snapshot for this {@link CopyOnWriteStateTable}. This method should be called once a snapshot is no more needed, + * so that the {@link CopyOnWriteStateTable} can stop considering this snapshot for copy-on-write, thus avoiding unnecessary + * object creation. + * + * @param snapshotToRelease the snapshot to release, which was previously created by this state table. + */ + void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> snapshotToRelease) { + + Preconditions.checkArgument(snapshotToRelease.isOwner(this), + "Cannot release snapshot which is owned by a different state table."); + + releaseSnapshot(snapshotToRelease.getSnapshotVersion()); + } + + // StateTableEntry ------------------------------------------------------------------------------------------------- + + /** + * One entry in the {@link CopyOnWriteStateTable}. This is a triplet of key, namespace, and state. Thereby, key and + * namespace together serve as a composite key for the state. This class also contains some management meta data for + * copy-on-write, a pointer to link other {@link StateTableEntry}s to a list, and cached hash code. + * + * @param <K> type of key. + * @param <N> type of namespace. + * @param <S> type of state. + */ + static class StateTableEntry<K, N, S> implements StateEntry<K, N, S> { + + /** + * The key. Assumed to be immutable and not null. + */ + final K key; + + /** + * The namespace. Assumed to be immutable and not null. + */ + final N namespace; + + /** + * The state. This is not final to allow exchanging the object for copy-on-write. Can be null. + */ + S state; + + /** + * Link to another {@link StateTableEntry}. This is used to resolve collisions in the + * {@link CopyOnWriteStateTable} through chaining. + */ + StateTableEntry<K, N, S> next; + + /** + * The version of this {@link StateTableEntry}. This is meta data for copy-on-write of the table structure. + */ + int entryVersion; + + /** + * The version of the state object in this entry. This is meta data for copy-on-write of the state object itself. + */ + int stateVersion; + + /** + * The computed secondary hash for the composite of key and namespace. + */ + final int hash; + + StateTableEntry() { + this(null, null, null, 0, null, 0, 0); + } + + StateTableEntry(StateTableEntry<K, N, S> other, int entryVersion) { + this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion); + } + + StateTableEntry( + K key, + N namespace, + S state, + int hash, + StateTableEntry<K, N, S> next, + int entryVersion, + int stateVersion) { + this.key = key; + this.namespace = namespace; + this.hash = hash; + this.next = next; + this.entryVersion = entryVersion; + this.state = state; + this.stateVersion = stateVersion; + } + + public final void setState(S value, int mapVersion) { + // naturally, we can update the state version every time we replace the old state with a different object + if (value != state) { + this.state = value; + this.stateVersion = mapVersion; + } + } + + @Override + public K getKey() { + return key; + } + + @Override + public N getNamespace() { + return namespace; + } + + @Override + public S getState() { + return state; + } + + @Override + public final boolean equals(Object o) { + if (!(o instanceof CopyOnWriteStateTable.StateTableEntry)) { + return false; + } + + StateEntry<?, ?, ?> e = (StateEntry<?, ?, ?>) o; + return e.getKey().equals(key) + && e.getNamespace().equals(namespace) + && Objects.equals(e.getState(), state); + } + + @Override + public final int hashCode() { + return (key.hashCode() ^ namespace.hashCode()) ^ Objects.hashCode(state); + } + + @Override + public final String toString() { + return "(" + key + "|" + namespace + ")=" + state; + } + } + + // For testing ---------------------------------------------------------------------------------------------------- + + @Override + public int sizeOfNamespace(Object namespace) { + int count = 0; + for (StateEntry<K, N, S> entry : this) { + if (null != entry && namespace.equals(entry.getNamespace())) { + ++count; + } + } + return count; + } + + + // StateEntryIterator --------------------------------------------------------------------------------------------- + + /** + * Iterator over the entries in a {@link CopyOnWriteStateTable}. + */ + class StateEntryIterator implements Iterator<StateEntry<K, N, S>> { + private StateTableEntry<K, N, S>[] activeTable; + private int nextTablePosition; + private StateTableEntry<K, N, S> nextEntry; + private int expectedModCount = modCount; + + StateEntryIterator() { + this.activeTable = primaryTable; + this.nextTablePosition = 0; + this.expectedModCount = modCount; + this.nextEntry = getBootstrapEntry(); + advanceIterator(); + } + + private StateTableEntry<K, N, S> advanceIterator() { + + StateTableEntry<K, N, S> entryToReturn = nextEntry; + StateTableEntry<K, N, S> next = entryToReturn.next; + + // consider both sub-tables tables to cover the case of rehash + while (next == null) { + + StateTableEntry<K, N, S>[] tab = activeTable; + + while (nextTablePosition < tab.length) { + next = tab[nextTablePosition++]; + + if (next != null) { + nextEntry = next; + return entryToReturn; + } + } + + if (activeTable == incrementalRehashTable) { + break; + } + + activeTable = incrementalRehashTable; + nextTablePosition = 0; + } + + nextEntry = next; + return entryToReturn; + } + + @Override + public boolean hasNext() { + return nextEntry != null; + } + + @Override + public StateTableEntry<K, N, S> next() { + if (modCount != expectedModCount) { + throw new ConcurrentModificationException(); + } + + if (nextEntry == null) { + throw new NoSuchElementException(); + } + + return advanceIterator(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read-only iterator"); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java new file mode 100644 index 0000000..db3b197 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/CopyOnWriteStateTableSnapshot.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import java.io.IOException; + +/** + * This class represents the snapshot of a {@link CopyOnWriteStateTable} and has a role in operator state checkpointing. Besides + * holding the {@link CopyOnWriteStateTable}s internal entries at the time of the snapshot, this class is also responsible for + * preparing and writing the state in the process of checkpointing. + * <p> + * IMPORTANT: Please notice that snapshot integrity of entries in this class rely on proper copy-on-write semantics + * through the {@link CopyOnWriteStateTable} that created the snapshot object, but all objects in this snapshot must be considered + * as READ-ONLY!. The reason is that the objects held by this class may or may not be deep copies of original objects + * that may still used in the {@link CopyOnWriteStateTable}. This depends for each entry on whether or not it was subject to + * copy-on-write operations by the {@link CopyOnWriteStateTable}. Phrased differently: the {@link CopyOnWriteStateTable} provides + * copy-on-write isolation for this snapshot, but this snapshot does not isolate modifications from the + * {@link CopyOnWriteStateTable}! + * + * @param <K> type of key + * @param <N> type of namespace + * @param <S> type of state + */ +@Internal +public class CopyOnWriteStateTableSnapshot<K, N, S> + extends AbstractStateTableSnapshot<K, N, S, CopyOnWriteStateTable<K, N, S>> { + + /** + * Version of the {@link CopyOnWriteStateTable} when this snapshot was created. This can be used to release the snapshot. + */ + private final int snapshotVersion; + + /** + * The number of entries in the {@link CopyOnWriteStateTable} at the time of creating this snapshot. + */ + private final int stateTableSize; + + /** + * The state table entries, as by the time this snapshot was created. Objects in this array may or may not be deep + * copies of the current entries in the {@link CopyOnWriteStateTable} that created this snapshot. This depends for each entry + * on whether or not it was subject to copy-on-write operations by the {@link CopyOnWriteStateTable}. + */ + private final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshotData; + + /** + * Offsets for the individual key-groups. This is lazily created when the snapshot is grouped by key-group during + * the process of writing this snapshot to an output as part of checkpointing. + */ + private int[] keyGroupOffsets; + + /** + * Creates a new {@link CopyOnWriteStateTableSnapshot}. + * + * @param owningStateTable the {@link CopyOnWriteStateTable} for which this object represents a snapshot. + */ + CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) { + + super(owningStateTable); + this.snapshotData = owningStateTable.snapshotTableArrays(); + this.snapshotVersion = owningStateTable.getStateTableVersion(); + this.stateTableSize = owningStateTable.size(); + this.keyGroupOffsets = null; + } + + /** + * Returns the internal version of the {@link CopyOnWriteStateTable} when this snapshot was created. This value must be used to + * tell the {@link CopyOnWriteStateTable} when to release this snapshot. + */ + int getSnapshotVersion() { + return snapshotVersion; + } + + /** + * Partitions the snapshot data by key-group. The algorithm first builds a histogram for the distribution of keys + * into key-groups. Then, the histogram is accumulated to obtain the boundaries of each key-group in an array. + * Last, we use the accumulated counts as write position pointers for the key-group's bins when reordering the + * entries by key-group. This operation is lazily performed before the first writing of a key-group. + * <p> + * As a possible future optimization, we could perform the repartitioning in-place, using a scheme similar to the + * cuckoo cycles in cuckoo hashing. This can trade some performance for a smaller memory footprint. + */ + @SuppressWarnings("unchecked") + private void partitionEntriesByKeyGroup() { + + // We only have to perform this step once before the first key-group is written + if (null != keyGroupOffsets) { + return; + } + + final KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange(); + final int totalKeyGroups = owningStateTable.keyContext.getNumberOfKeyGroups(); + final int baseKgIdx = keyGroupRange.getStartKeyGroup(); + final int[] histogram = new int[keyGroupRange.getNumberOfKeyGroups() + 1]; + + CopyOnWriteStateTable.StateTableEntry<K, N, S>[] unfold = new CopyOnWriteStateTable.StateTableEntry[stateTableSize]; + + // 1) In this step we i) 'unfold' the linked list of entries to a flat array and ii) build a histogram for key-groups + int unfoldIndex = 0; + for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : snapshotData) { + while (null != entry) { + int effectiveKgIdx = + KeyGroupRangeAssignment.computeKeyGroupForKeyHash(entry.key.hashCode(), totalKeyGroups) - baseKgIdx + 1; + ++histogram[effectiveKgIdx]; + unfold[unfoldIndex++] = entry; + entry = entry.next; + } + } + + // 2) We accumulate the histogram bins to obtain key-group ranges in the final array + for (int i = 1; i < histogram.length; ++i) { + histogram[i] += histogram[i - 1]; + } + + // 3) We repartition the entries by key-group, using the histogram values as write indexes + for (CopyOnWriteStateTable.StateTableEntry<K, N, S> t : unfold) { + int effectiveKgIdx = + KeyGroupRangeAssignment.computeKeyGroupForKeyHash(t.key.hashCode(), totalKeyGroups) - baseKgIdx; + snapshotData[histogram[effectiveKgIdx]++] = t; + } + + // 4) As byproduct, we also created the key-group offsets + this.keyGroupOffsets = histogram; + } + + @Override + public void release() { + owningStateTable.releaseSnapshot(this); + } + + @Override + public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException { + + if (null == keyGroupOffsets) { + partitionEntriesByKeyGroup(); + } + + final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] groupedOut = snapshotData; + KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange(); + int keyGroupOffsetIdx = keyGroupId - keyGroupRange.getStartKeyGroup() - 1; + int startOffset = keyGroupOffsetIdx < 0 ? 0 : keyGroupOffsets[keyGroupOffsetIdx]; + int endOffset = keyGroupOffsets[keyGroupOffsetIdx + 1]; + + TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer(); + TypeSerializer<N> namespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer(); + TypeSerializer<S> stateSerializer = owningStateTable.metaInfo.getStateSerializer(); + + // write number of mappings in key-group + dov.writeInt(endOffset - startOffset); + + // write mappings + for (int i = startOffset; i < endOffset; ++i) { + CopyOnWriteStateTable.StateTableEntry<K, N, S> toWrite = groupedOut[i]; + groupedOut[i] = null; // free asap for GC + namespaceSerializer.serialize(toWrite.namespace, dov); + keySerializer.serialize(toWrite.key, dov); + stateSerializer.serialize(toWrite.state, dov); + } + } + + /** + * Returns true iff the given state table is the owner of this snapshot object. + */ + boolean isOwner(CopyOnWriteStateTable<K, N, S> stateTable) { + return stateTable == owningStateTable; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapFoldingState.java new file mode 100644 index 0000000..ad955c3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapFoldingState.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Heap-backed partitioned {@link FoldingState} that is + * snapshotted into files. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <T> The type of the values that can be folded into the state. + * @param <ACC> The type of the value in the folding state. + */ +public class HeapFoldingState<K, N, T, ACC> + extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> + implements FoldingState<T, ACC> { + + /** The function used to fold the state */ + private final FoldTransformation<T, ACC> foldTransformation; + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + public HeapFoldingState( + FoldingStateDescriptor<T, ACC> stateDesc, + StateTable<K, N, ACC> stateTable, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(stateDesc, stateTable, keySerializer, namespaceSerializer); + this.foldTransformation = new FoldTransformation<>(stateDesc); + } + + // ------------------------------------------------------------------------ + // state access + // ------------------------------------------------------------------------ + + public ACC get() { + return stateTable.get(currentNamespace); + } + + public void add(T value) throws IOException { + + if (value == null) { + clear(); + return; + } + + try { + stateTable.transform(currentNamespace, value, foldTransformation); + } catch (Exception e) { + throw new IOException("Could not add value to folding state.", e); + } + } + + static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> { + + private final FoldingStateDescriptor<T, ACC> stateDescriptor; + private final FoldFunction<T, ACC> foldFunction; + + FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) { + this.stateDescriptor = Preconditions.checkNotNull(stateDesc); + this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction()); + } + + @Override + public ACC apply(ACC previousState, T value) throws Exception { + return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapListState.java new file mode 100644 index 0000000..ab5fff5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapListState.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; + +/** + * Heap-backed partitioned {@link ListState} that is snapshotted + * into files. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of the value. + */ +public class HeapListState<K, N, V> + extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>> + implements ListState<V> { + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + public HeapListState( + ListStateDescriptor<V> stateDesc, + StateTable<K, N, ArrayList<V>> stateTable, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + // ------------------------------------------------------------------------ + // state access + // ------------------------------------------------------------------------ + + @Override + public Iterable<V> get() { + return stateTable.get(currentNamespace); + } + + @Override + public void add(V value) { + final N namespace = currentNamespace; + + if (value == null) { + clear(); + return; + } + + final StateTable<K, N, ArrayList<V>> map = stateTable; + ArrayList<V> list = map.get(namespace); + + if (list == null) { + list = new ArrayList<>(); + map.put(namespace, list); + } + list.add(value); + } + + @Override + public byte[] getSerializedValue(K key, N namespace) throws Exception { + Preconditions.checkState(namespace != null, "No namespace given."); + Preconditions.checkState(key != null, "No key given."); + + ArrayList<V> result = stateTable.get(key, namespace); + + if (result == null) { + return null; + } + + TypeSerializer<V> serializer = stateDesc.getSerializer(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos); + + // write the same as RocksDB writes lists, with one ',' separator + for (int i = 0; i < result.size(); i++) { + serializer.serialize(result.get(i), view); + if (i < result.size() -1) { + view.writeByte(','); + } + } + view.flush(); + + return baos.toByteArray(); + } + + // ------------------------------------------------------------------------ + // state merging + // ------------------------------------------------------------------------ + + @Override + protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) { + a.addAll(b); + return a; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapReducingState.java new file mode 100644 index 0000000..b6eed74 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapReducingState.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Heap-backed partitioned {@link ReducingState} that is + * snapshotted into files. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of the value. + */ +public class HeapReducingState<K, N, V> + extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>> + implements ReducingState<V> { + + private final ReduceTransformation<V> reduceTransformation; + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state table to use in this kev/value state. May contain initial state. + */ + public HeapReducingState( + ReducingStateDescriptor<V> stateDesc, + StateTable<K, N, V> stateTable, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + + super(stateDesc, stateTable, keySerializer, namespaceSerializer); + this.reduceTransformation = new ReduceTransformation<>(stateDesc.getReduceFunction()); + } + + // ------------------------------------------------------------------------ + // state access + // ------------------------------------------------------------------------ + + @Override + public V get() { + return stateTable.get(currentNamespace); + } + + @Override + public void add(V value) throws IOException { + + if (value == null) { + clear(); + return; + } + + try { + stateTable.transform(currentNamespace, value, reduceTransformation); + } catch (Exception e) { + throw new IOException("Exception while applying ReduceFunction in reducing state", e); + } + } + + // ------------------------------------------------------------------------ + // state merging + // ------------------------------------------------------------------------ + + @Override + protected V mergeState(V a, V b) throws Exception { + return reduceTransformation.apply(a, b); + } + + static final class ReduceTransformation<V> implements StateTransformationFunction<V, V> { + + private final ReduceFunction<V> reduceFunction; + + ReduceTransformation(ReduceFunction<V> reduceFunction) { + this.reduceFunction = Preconditions.checkNotNull(reduceFunction); + } + + @Override + public V apply(V previousState, V value) throws Exception { + return previousState != null ? reduceFunction.reduce(previousState, value) : value; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapValueState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapValueState.java new file mode 100644 index 0000000..436c20e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/HeapValueState.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Heap-backed partitioned {@link ValueState} that is snapshotted + * into files. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <V> The type of the value. + */ +public class HeapValueState<K, N, V> + extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>> + implements ValueState<V> { + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + * @param stateTable The state tab;e to use in this kev/value state. May contain initial state. + */ + public HeapValueState( + ValueStateDescriptor<V> stateDesc, + StateTable<K, N, V> stateTable, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(stateDesc, stateTable, keySerializer, namespaceSerializer); + } + + @Override + public V value() { + final V result = stateTable.get(currentNamespace); + + if (result == null) { + return stateDesc.getDefaultValue(); + } + + return result; + } + + @Override + public void update(V value) { + + if (value == null) { + clear(); + return; + } + + stateTable.put(currentNamespace, value); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/InternalKeyContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/InternalKeyContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/InternalKeyContext.java new file mode 100644 index 0000000..bf988ee --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/InternalKeyContext.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; + +/** + * This interface is the current context of a keyed state. It provides information about the currently selected key in + * the context, the corresponding key-group, and other key and key-grouping related information. + * <p> + * The typical use case for this interface is providing a view on the current-key selection aspects of + * {@link org.apache.flink.runtime.state.KeyedStateBackend}. + */ +@Internal +public interface InternalKeyContext<K> { + + /** + * Used by states to access the current key. + */ + K getCurrentKey(); + + /** + * Returns the key-group to which the current key belongs. + */ + int getCurrentKeyGroupIndex(); + + /** + * Returns the number of key-groups aka max parallelism. + */ + int getNumberOfKeyGroups(); + + /** + * Returns the key groups for this backend. + */ + KeyGroupRange getKeyGroupRange(); + + /** + * {@link TypeSerializer} for the state backend key type. + */ + TypeSerializer<K> getKeySerializer(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateEntry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateEntry.java new file mode 100644 index 0000000..d32e825 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateEntry.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +/** + * Interface of entries in a state table. Entries are triple of key, namespace, and state. + * + * @param <K> type of key. + * @param <N> type of namespace. + * @param <S> type of state. + */ +public interface StateEntry<K, N, S> { + + /** + * Returns the key of this entry. + */ + K getKey(); + + /** + * Returns the namespace of this entry. + */ + N getNamespace(); + + /** + * Returns the state of this entry. + */ + S getState(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTable.java new file mode 100644 index 0000000..c1db7e8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTable.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.util.Preconditions; + +/** + * Base class for state tables. Accesses to state are typically scoped by the currently active key, as provided + * through the {@link InternalKeyContext}. + * + * @param <K> type of key + * @param <N> type of namespace + * @param <S> type of state + */ +public abstract class StateTable<K, N, S> { + + /** + * The key context view on the backend. This provides information, such as the currently active key. + */ + protected final InternalKeyContext<K> keyContext; + + /** + * Combined meta information such as name and serializers for this state + */ + protected RegisteredBackendStateMetaInfo<N, S> metaInfo; + + /** + * + * @param keyContext the key context provides the key scope for all put/get/delete operations. + * @param metaInfo the meta information, including the type serializer for state copy-on-write. + */ + public StateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) { + this.keyContext = Preconditions.checkNotNull(keyContext); + this.metaInfo = Preconditions.checkNotNull(metaInfo); + } + + // Main interface methods of StateTable ------------------------------------------------------- + + /** + * Returns whether this {@link NestedMapsStateTable} is empty. + * + * @return {@code true} if this {@link NestedMapsStateTable} has no elements, {@code false} + * otherwise. + * @see #size() + */ + public boolean isEmpty() { + return size() == 0; + } + + /** + * Returns the total number of entries in this {@link NestedMapsStateTable}. This is the sum of both sub-tables. + * + * @return the number of entries in this {@link NestedMapsStateTable}. + */ + public abstract int size(); + + /** + * Returns the state of the mapping for the composite of active key and given namespace. + * + * @param namespace the namespace. Not null. + * @return the states of the mapping with the specified key/namespace composite key, or {@code null} + * if no mapping for the specified key is found. + */ + public abstract S get(N namespace); + + /** + * Returns whether this table contains a mapping for the composite of active key and given namespace. + * + * @param namespace the namespace in the composite key to search for. Not null. + * @return {@code true} if this map contains the specified key/namespace composite key, + * {@code false} otherwise. + */ + public abstract boolean containsKey(N namespace); + + /** + * Maps the composite of active key and given namespace to the specified state. This method should be preferred + * over {@link #putAndGetOld(N, S)} (Namespace, State)} when the caller is not interested in the old state. + * + * @param namespace the namespace. Not null. + * @param state the state. Can be null. + */ + public abstract void put(N namespace, S state); + + /** + * Maps the composite of active key and given namespace to the specified state. Returns the previous state that + * was registered under the composite key. + * + * @param namespace the namespace. Not null. + * @param state the state. Can be null. + * @return the state of any previous mapping with the specified key or + * {@code null} if there was no such mapping. + */ + public abstract S putAndGetOld(N namespace, S state); + + /** + * Removes the mapping for the composite of active key and given namespace. This method should be preferred + * over {@link #removeAndGetOld(N)} when the caller is not interested in the old state. + * + * @param namespace the namespace of the mapping to remove. Not null. + */ + public abstract void remove(N namespace); + + /** + * Removes the mapping for the composite of active key and given namespace, returning the state that was + * found under the entry. + * + * @param namespace the namespace of the mapping to remove. Not null. + * @return the state of the removed mapping or {@code null} if no mapping + * for the specified key was found. + */ + public abstract S removeAndGetOld(N namespace); + + /** + * Applies the given {@link StateTransformationFunction} to the state (1st input argument), using the given value as + * second input argument. The result of {@link StateTransformationFunction#apply(Object, Object)} is then stored as + * the new state. This function is basically an optimization for get-update-put pattern. + * + * @param namespace the namespace. Not null. + * @param value the value to use in transforming the state. Can be null. + * @param transformation the transformation function. + * @throws Exception if some exception happens in the transformation function. + */ + public abstract <T> void transform( + N namespace, + T value, + StateTransformationFunction<S, T> transformation) throws Exception; + + // For queryable state ------------------------------------------------------------------------ + + /** + * Returns the state for the composite of active key and given namespace. This is typically used by + * queryable state. + * + * @param key the key. Not null. + * @param namespace the namespace. Not null. + * @return the state of the mapping with the specified key/namespace composite key, or {@code null} + * if no mapping for the specified key is found. + */ + public abstract S get(K key, N namespace); + + // Meta data setter / getter and toString ----------------------------------------------------- + + public TypeSerializer<S> getStateSerializer() { + return metaInfo.getStateSerializer(); + } + + public TypeSerializer<N> getNamespaceSerializer() { + return metaInfo.getNamespaceSerializer(); + } + + public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() { + return metaInfo; + } + + public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) { + this.metaInfo = metaInfo; + } + + // Snapshot / Restore ------------------------------------------------------------------------- + + abstract StateTableSnapshot createSnapshot(); + + public abstract void put(K key, int keyGroup, N namespace, S state); + + // For testing -------------------------------------------------------------------------------- + + @VisibleForTesting + public abstract int sizeOfNamespace(Object namespace); +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReader.java new file mode 100644 index 0000000..41f0abd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.core.memory.DataInputView; + +import java.io.IOException; + +/** + * Interface for state de-serialization into {@link org.apache.flink.runtime.state.heap.StateTable}s by key-group. + */ +interface StateTableByKeyGroupReader { + + /** + * Read the data for the specified key-group from the input. + * + * @param div the input + * @param keyGroupId the key-group to write + * @throws IOException on write related problems + */ + void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReaders.java new file mode 100644 index 0000000..2b5f15a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableByKeyGroupReaders.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; + +import java.io.IOException; + +/** + * This class provides a static factory method to create different implementations of {@link StateTableByKeyGroupReader} + * depending on the provided serialization format version. + * <p> + * The implementations are also located here as inner classes. + */ +class StateTableByKeyGroupReaders { + + /** + * Creates a new StateTableByKeyGroupReader that inserts de-serialized mappings into the given table, using the + * de-serialization algorithm that matches the given version. + * + * @param table the {@link org.apache.flink.runtime.state.heap.StateTable} into which de-serialized mappings are inserted. + * @param version version for the de-serialization algorithm. + * @param <K> type of key. + * @param <N> type of namespace. + * @param <S> type of state. + * @return the appropriate reader. + */ + static <K, N, S> StateTableByKeyGroupReader readerForVersion(StateTable<K, N, S> table, int version) { + switch (version) { + case 1: + return new StateTableByKeyGroupReaderV1<>(table); + case 2: + return new StateTableByKeyGroupReaderV2<>(table); + default: + throw new IllegalArgumentException("Unknown version: " + version); + } + } + + static abstract class AbstractStateTableByKeyGroupReader<K, N, S> + implements StateTableByKeyGroupReader { + + protected final StateTable<K, N, S> stateTable; + + AbstractStateTableByKeyGroupReader(StateTable<K, N, S> stateTable) { + this.stateTable = stateTable; + } + + @Override + public abstract void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException; + + protected TypeSerializer<K> getKeySerializer() { + return stateTable.keyContext.getKeySerializer(); + } + + protected TypeSerializer<N> getNamespaceSerializer() { + return stateTable.getNamespaceSerializer(); + } + + protected TypeSerializer<S> getStateSerializer() { + return stateTable.getStateSerializer(); + } + } + + static final class StateTableByKeyGroupReaderV1<K, N, S> + extends AbstractStateTableByKeyGroupReader<K, N, S> { + + StateTableByKeyGroupReaderV1(StateTable<K, N, S> stateTable) { + super(stateTable); + } + + @Override + public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException { + + if (inView.readByte() == 0) { + return; + } + + final TypeSerializer<K> keySerializer = getKeySerializer(); + final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer(); + final TypeSerializer<S> stateSerializer = getStateSerializer(); + + // V1 uses kind of namespace compressing format + int numNamespaces = inView.readInt(); + for (int k = 0; k < numNamespaces; k++) { + N namespace = namespaceSerializer.deserialize(inView); + int numEntries = inView.readInt(); + for (int l = 0; l < numEntries; l++) { + K key = keySerializer.deserialize(inView); + S state = stateSerializer.deserialize(inView); + stateTable.put(key, keyGroupId, namespace, state); + } + } + } + } + + private static final class StateTableByKeyGroupReaderV2<K, N, S> + extends AbstractStateTableByKeyGroupReader<K, N, S> { + + StateTableByKeyGroupReaderV2(StateTable<K, N, S> stateTable) { + super(stateTable); + } + + @Override + public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException { + + final TypeSerializer<K> keySerializer = getKeySerializer(); + final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer(); + final TypeSerializer<S> stateSerializer = getStateSerializer(); + + int numKeys = inView.readInt(); + for (int i = 0; i < numKeys; ++i) { + N namespace = namespaceSerializer.deserialize(inView); + K key = keySerializer.deserialize(inView); + S state = stateSerializer.deserialize(inView); + stateTable.put(key, keyGroupId, namespace, state); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableSnapshot.java new file mode 100644 index 0000000..184cd59 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/async/StateTableSnapshot.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.async; + +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Interface for the snapshots of a {@link org.apache.flink.runtime.state.heap.StateTable}. Offers a way to serialize the snapshot (by key-group). All + * snapshots should be released after usage. + */ +interface StateTableSnapshot { + + /** + * Writes the data for the specified key-group to the output. + * + * @param dov the output + * @param keyGroupId the key-group to write + * @throws IOException on write related problems + */ + void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException; + + /** + * Release the snapshot. All snapshots should be released when they are no longer used because some implementation + * can only release resources after a release. + */ + void release(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6a80725/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/async/AsyncMemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/async/AsyncMemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/async/AsyncMemoryStateBackend.java new file mode 100644 index 0000000..54a208a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/async/AsyncMemoryStateBackend.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.memory.async; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import java.io.IOException; + +/** + * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no + * capabilities to spill to disk. Checkpoints are serialized and the serialized data is + * transferred + */ +public class AsyncMemoryStateBackend extends AbstractStateBackend { + + private static final long serialVersionUID = 4109305377809414635L; + + /** The default maximal size that the snapshotted memory state may have (5 MiBytes) */ + private static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024; + + /** The maximal size that the snapshotted memory state may have */ + private final int maxStateSize; + + /** + * Creates a new memory state backend that accepts states whose serialized forms are + * up to the default state size (5 MB). + */ + public AsyncMemoryStateBackend() { + this(DEFAULT_MAX_STATE_SIZE); + } + + /** + * Creates a new memory state backend that accepts states whose serialized forms are + * up to the given number of bytes. + * + * @param maxStateSize The maximal size of the serialized state + */ + public AsyncMemoryStateBackend(int maxStateSize) { + this.maxStateSize = maxStateSize; + } + + @Override + public String toString() { + return "MemoryStateBackend (data in heap memory / checkpoints to JobManager)"; + } + + @Override + public CheckpointStreamFactory createStreamFactory( + JobID jobId, String operatorIdentifier) throws IOException { + return new MemCheckpointStreamFactory(maxStateSize); + } + + @Override + public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( + Environment env, JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry) throws IOException { + + return new AsyncHeapKeyedStateBackend<>( + kvStateRegistry, + keySerializer, + env.getUserClassLoader(), + numberOfKeyGroups, + keyGroupRange); + } +}
