StefanRRichter commented on a change in pull request #8611:
[FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293555221
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
##########
@@ -219,985 +43,41 @@
InternalKeyContext<K> keyContext,
RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,
TypeSerializer<K> keySerializer) {
- this(keyContext, metaInfo, DEFAULT_CAPACITY, keySerializer);
- }
-
- /**
- * Constructs a new {@code StateTable} instance with the specified
capacity.
- *
- * @param keyContext the key context.
- * @param metaInfo the meta information, including the type
serializer for state copy-on-write.
- * @param capacity the initial capacity of this hash map.
- * @param keySerializer the serializer of the key.
- * @throws IllegalArgumentException when the capacity is less than zero.
- */
- @SuppressWarnings("unchecked")
- private CopyOnWriteStateTable(
- InternalKeyContext<K> keyContext,
- RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,
- int capacity,
- TypeSerializer<K> keySerializer) {
super(keyContext, metaInfo, keySerializer);
-
- // initialized tables to EMPTY_TABLE.
- this.primaryTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
- this.incrementalRehashTable = (StateTableEntry<K, N, S>[])
EMPTY_TABLE;
-
- // initialize sizes to 0.
- this.primaryTableSize = 0;
- this.incrementalRehashTableSize = 0;
-
- this.rehashIndex = 0;
- this.stateTableVersion = 0;
- this.highestRequiredSnapshotVersion = 0;
- this.snapshotVersions = new TreeSet<>();
-
- if (capacity < 0) {
- throw new IllegalArgumentException("Capacity: " +
capacity);
- }
-
- if (capacity == 0) {
- threshold = -1;
- return;
- }
-
- if (capacity < MINIMUM_CAPACITY) {
- capacity = MINIMUM_CAPACITY;
- } else if (capacity > MAXIMUM_CAPACITY) {
- capacity = MAXIMUM_CAPACITY;
- } else {
- capacity = MathUtils.roundUpToPowerOfTwo(capacity);
- }
- primaryTable = makeTable(capacity);
}
- // Public API from AbstractStateTable
------------------------------------------------------------------------------
-
- /**
- * Returns the total number of entries in this {@link
CopyOnWriteStateTable}. This is the sum of both sub-tables.
- *
- * @return the number of entries in this {@link CopyOnWriteStateTable}.
- */
- @Override
- public int size() {
- return primaryTableSize + incrementalRehashTableSize;
- }
-
- @Override
- public S get(K key, N namespace) {
-
- final int hash =
computeHashForOperationAndDoIncrementalRehash(key, namespace);
- final int requiredVersion = highestRequiredSnapshotVersion;
- final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
- int index = hash & (tab.length - 1);
-
- for (StateTableEntry<K, N, S> e = tab[index]; e != null; e =
e.next) {
- final K eKey = e.key;
- final N eNamespace = e.namespace;
- if ((e.hash == hash && key.equals(eKey) &&
namespace.equals(eNamespace))) {
-
- // copy-on-write check for state
- if (e.stateVersion < requiredVersion) {
- // copy-on-write check for entry
- if (e.entryVersion < requiredVersion) {
- e =
handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
- }
- e.stateVersion = stateTableVersion;
- e.state =
getStateSerializer().copy(e.state);
- }
-
- return e.state;
- }
- }
-
- return null;
- }
-
- @Override
- public Stream<K> getKeys(N namespace) {
- return StreamSupport.stream(spliterator(), false)
- .filter(entry -> entry.getNamespace().equals(namespace))
- .map(StateEntry::getKey);
- }
-
- @Override
- public void put(K key, int keyGroup, N namespace, S state) {
- put(key, namespace, state);
- }
-
- @Override
- public S get(N namespace) {
- return get(keyContext.getCurrentKey(), namespace);
- }
-
- @Override
- public boolean containsKey(N namespace) {
- return containsKey(keyContext.getCurrentKey(), namespace);
- }
-
- @Override
- public void put(N namespace, S state) {
- put(keyContext.getCurrentKey(), namespace, state);
- }
-
- @Override
- public S putAndGetOld(N namespace, S state) {
- return putAndGetOld(keyContext.getCurrentKey(), namespace,
state);
- }
-
- @Override
- public void remove(N namespace) {
- remove(keyContext.getCurrentKey(), namespace);
- }
-
- @Override
- public S removeAndGetOld(N namespace) {
- return removeAndGetOld(keyContext.getCurrentKey(), namespace);
- }
-
- @Override
- public <T> void transform(N namespace, T value,
StateTransformationFunction<S, T> transformation) throws Exception {
- transform(keyContext.getCurrentKey(), namespace, value,
transformation);
- }
-
- // Private implementation details of the API methods
---------------------------------------------------------------
-
- /**
- * Returns whether this table contains the specified key/namespace
composite key.
- *
- * @param key the key in the composite key to search for. Not
null.
- * @param namespace the namespace in the composite key to search for.
Not null.
- * @return {@code true} if this map contains the specified
key/namespace composite key,
- * {@code false} otherwise.
- */
- boolean containsKey(K key, N namespace) {
-
- final int hash =
computeHashForOperationAndDoIncrementalRehash(key, namespace);
- final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
- int index = hash & (tab.length - 1);
-
- for (StateTableEntry<K, N, S> e = tab[index]; e != null; e =
e.next) {
- final K eKey = e.key;
- final N eNamespace = e.namespace;
-
- if ((e.hash == hash && key.equals(eKey) &&
namespace.equals(eNamespace))) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * Maps the specified key/namespace composite key to the specified
value. This method should be preferred
- * over {@link #putAndGetOld(Object, Object, Object)} (Object, Object)}
when the caller is not interested
- * in the old value, because this can potentially reduce copy-on-write
activity.
- *
- * @param key the key. Not null.
- * @param namespace the namespace. Not null.
- * @param value the value. Can be null.
- */
- void put(K key, N namespace, S value) {
- final StateTableEntry<K, N, S> e = putEntry(key, namespace);
-
- e.state = value;
- e.stateVersion = stateTableVersion;
- }
-
- /**
- * Maps the specified key/namespace composite key to the specified
value. Returns the previous state that was
- * registered under the composite key.
- *
- * @param key the key. Not null.
- * @param namespace the namespace. Not null.
- * @param value the value. Can be null.
- * @return the value of any previous mapping with the specified key or
- * {@code null} if there was no such mapping.
- */
- S putAndGetOld(K key, N namespace, S value) {
-
- final StateTableEntry<K, N, S> e = putEntry(key, namespace);
-
- // copy-on-write check for state
- S oldState = (e.stateVersion < highestRequiredSnapshotVersion) ?
- getStateSerializer().copy(e.state) :
- e.state;
-
- e.state = value;
- e.stateVersion = stateTableVersion;
-
- return oldState;
- }
-
- /**
- * Removes the mapping with the specified key/namespace composite key
from this map. This method should be preferred
- * over {@link #removeAndGetOld(Object, Object)} when the caller is not
interested in the old value, because this
- * can potentially reduce copy-on-write activity.
- *
- * @param key the key of the mapping to remove. Not null.
- * @param namespace the namespace of the mapping to remove. Not null.
- */
- void remove(K key, N namespace) {
- removeEntry(key, namespace);
- }
-
- /**
- * Removes the mapping with the specified key/namespace composite key
from this map, returning the state that was
- * found under the entry.
- *
- * @param key the key of the mapping to remove. Not null.
- * @param namespace the namespace of the mapping to remove. Not null.
- * @return the value of the removed mapping or {@code null} if no
mapping
- * for the specified key was found.
- */
- S removeAndGetOld(K key, N namespace) {
-
- final StateTableEntry<K, N, S> e = removeEntry(key, namespace);
-
- return e != null ?
- // copy-on-write check for state
- (e.stateVersion <
highestRequiredSnapshotVersion ?
-
getStateSerializer().copy(e.state) :
- e.state) :
- null;
- }
-
- /**
- * @param key the key of the mapping to remove. Not null.
- * @param namespace the namespace of the mapping to remove. Not
null.
- * @param value the value that is the second input for the
transformation.
- * @param transformation the transformation function to apply on the
old state and the given value.
- * @param <T> type of the value that is the second input to
the {@link StateTransformationFunction}.
- * @throws Exception exception that happen on applying the function.
- * @see #transform(Object, Object, StateTransformationFunction).
- */
- <T> void transform(
- K key,
- N namespace,
- T value,
- StateTransformationFunction<S, T> transformation)
throws Exception {
-
- final StateTableEntry<K, N, S> entry = putEntry(key, namespace);
-
- // copy-on-write check for state
- entry.state = transformation.apply(
- (entry.stateVersion <
highestRequiredSnapshotVersion) ?
-
getStateSerializer().copy(entry.state) :
- entry.state,
- value);
- entry.stateVersion = stateTableVersion;
- }
-
- /**
- * Helper method that is the basis for operations that add mappings.
- */
- private StateTableEntry<K, N, S> putEntry(K key, N namespace) {
-
- final int hash =
computeHashForOperationAndDoIncrementalRehash(key, namespace);
- final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
- int index = hash & (tab.length - 1);
-
- for (StateTableEntry<K, N, S> e = tab[index]; e != null; e =
e.next) {
- if (e.hash == hash && key.equals(e.key) &&
namespace.equals(e.namespace)) {
-
- // copy-on-write check for entry
- if (e.entryVersion <
highestRequiredSnapshotVersion) {
- e = handleChainedEntryCopyOnWrite(tab,
index, e);
- }
-
- return e;
- }
- }
-
- ++modCount;
- if (size() > threshold) {
- doubleCapacity();
- }
-
- return addNewStateTableEntry(tab, key, namespace, hash);
- }
-
- /**
- * Helper method that is the basis for operations that remove mappings.
- */
- private StateTableEntry<K, N, S> removeEntry(K key, N namespace) {
-
- final int hash =
computeHashForOperationAndDoIncrementalRehash(key, namespace);
- final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
- int index = hash & (tab.length - 1);
-
- for (StateTableEntry<K, N, S> e = tab[index], prev = null; e !=
null; prev = e, e = e.next) {
- if (e.hash == hash && key.equals(e.key) &&
namespace.equals(e.namespace)) {
- if (prev == null) {
- tab[index] = e.next;
- } else {
- // copy-on-write check for entry
- if (prev.entryVersion <
highestRequiredSnapshotVersion) {
- prev =
handleChainedEntryCopyOnWrite(tab, index, prev);
- }
- prev.next = e.next;
- }
- ++modCount;
- if (tab == primaryTable) {
- --primaryTableSize;
- } else {
- --incrementalRehashTableSize;
- }
- return e;
- }
- }
- return null;
- }
-
- private void checkKeyNamespacePreconditions(K key, N namespace) {
- Preconditions.checkNotNull(key, "No key set. This method should
not be called outside of a keyed context.");
- Preconditions.checkNotNull(namespace, "Provided namespace is
null.");
- }
-
- // Meta data setter / getter and toString
--------------------------------------------------------------------------
-
- @Override
- public TypeSerializer<S> getStateSerializer() {
- return metaInfo.getStateSerializer();
- }
-
- @Override
- public TypeSerializer<N> getNamespaceSerializer() {
- return metaInfo.getNamespaceSerializer();
- }
-
- @Override
- public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
- return metaInfo;
- }
-
- @Override
- public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S>
metaInfo) {
- this.metaInfo = metaInfo;
- }
-
- // Iteration
------------------------------------------------------------------------------------------------------
-
- @Nonnull
@Override
- public Iterator<StateEntry<K, N, S>> iterator() {
- return new StateEntryIterator();
- }
-
- // Private utility functions for StateTable management
-------------------------------------------------------------
-
- /**
- * @see #releaseSnapshot(CopyOnWriteStateTableSnapshot)
- */
- @VisibleForTesting
- void releaseSnapshot(int snapshotVersion) {
- // we guard against concurrent modifications of
highestRequiredSnapshotVersion between snapshot and release.
- // Only stale reads of from the result of #releaseSnapshot
calls are ok.
- synchronized (snapshotVersions) {
-
Preconditions.checkState(snapshotVersions.remove(snapshotVersion), "Attempt to
release unknown snapshot version");
- highestRequiredSnapshotVersion =
snapshotVersions.isEmpty() ? 0 : snapshotVersions.last();
- }
- }
-
- /**
- * Creates (combined) copy of the table arrays for a snapshot. This
method must be called by the same Thread that
- * does modifications to the {@link CopyOnWriteStateTable}.
- */
- @VisibleForTesting
- @SuppressWarnings("unchecked")
- StateTableEntry<K, N, S>[] snapshotTableArrays() {
-
- // we guard against concurrent modifications of
highestRequiredSnapshotVersion between snapshot and release.
- // Only stale reads of from the result of #releaseSnapshot
calls are ok. This is why we must call this method
- // from the same thread that does all the modifications to the
table.
- synchronized (snapshotVersions) {
-
- // increase the table version for copy-on-write and
register the snapshot
- if (++stateTableVersion < 0) {
- // this is just a safety net against overflows,
but should never happen in practice (i.e., only after 2^31 snapshots)
- throw new IllegalStateException("Version count
overflow in CopyOnWriteStateTable. Enforcing restart.");
- }
-
- highestRequiredSnapshotVersion = stateTableVersion;
- snapshotVersions.add(highestRequiredSnapshotVersion);
- }
-
- StateTableEntry<K, N, S>[] table = primaryTable;
-
- // In order to reuse the copied array as the destination array
for the partitioned records in
- // CopyOnWriteStateTableSnapshot#partitionByKeyGroup(), we need
to make sure that the copied array
- // is big enough to hold the flattened entries. In fact, given
the current rehashing algorithm, we only
- // need to do this check when isRehashing() is false, but in
order to get a more robust code(in case that
- // the rehashing algorithm may changed in the future), we do
this check for all the case.
- final int totalTableIndexSize = rehashIndex + table.length;
- final int copiedArraySize = Math.max(totalTableIndexSize,
size());
- final StateTableEntry<K, N, S>[] copy = new
StateTableEntry[copiedArraySize];
-
- if (isRehashing()) {
- // consider both tables for the snapshot, the rehash
index tells us which part of the two tables we need
- final int localRehashIndex = rehashIndex;
- final int localCopyLength = table.length -
localRehashIndex;
- // for the primary table, take every index >= rhIdx.
- System.arraycopy(table, localRehashIndex, copy, 0,
localCopyLength);
-
- // for the new table, we are sure that two regions
contain all the entries:
- // [0, rhIdx[ AND [table.length / 2, table.length / 2 +
rhIdx[
- table = incrementalRehashTable;
- System.arraycopy(table, 0, copy, localCopyLength,
localRehashIndex);
- System.arraycopy(table, table.length >>> 1, copy,
localCopyLength + localRehashIndex, localRehashIndex);
- } else {
- // we only need to copy the primary table
- System.arraycopy(table, 0, copy, 0, table.length);
- }
-
- return copy;
- }
-
- /**
- * Allocate a table of the given capacity and set the threshold
accordingly.
- *
- * @param newCapacity must be a power of two
- */
- private StateTableEntry<K, N, S>[] makeTable(int newCapacity) {
-
- if (newCapacity < MAXIMUM_CAPACITY) {
- threshold = (newCapacity >> 1) + (newCapacity >> 2); //
3/4 capacity
- } else {
- if (size() > MAX_ARRAY_SIZE) {
-
- throw new IllegalStateException("Maximum
capacity of CopyOnWriteStateTable is reached and the job " +
- "cannot continue. Please consider
scaling-out your job or using a different keyed state backend " +
- "implementation!");
- } else {
-
- LOG.warn("Maximum capacity of 2^30 in
StateTable reached. Cannot increase hash table size. This can " +
- "lead to more collisions and lower
performance. Please consider scaling-out your job or using a " +
- "different keyed state backend
implementation!");
- threshold = MAX_ARRAY_SIZE;
- }
- }
-
- @SuppressWarnings("unchecked") StateTableEntry<K, N, S>[]
newTable
- = (StateTableEntry<K, N, S>[]) new
StateTableEntry[newCapacity];
- return newTable;
- }
-
- /**
- * Creates and inserts a new {@link StateTableEntry}.
- */
- private StateTableEntry<K, N, S> addNewStateTableEntry(
- StateTableEntry<K, N, S>[] table,
- K key,
- N namespace,
- int hash) {
-
- // small optimization that aims to avoid holding references on
duplicate namespace objects
- if (namespace.equals(lastNamespace)) {
- namespace = lastNamespace;
- } else {
- lastNamespace = namespace;
- }
-
- int index = hash & (table.length - 1);
- StateTableEntry<K, N, S> newEntry = new StateTableEntry<>(
- key,
- namespace,
- null,
- hash,
- table[index],
- stateTableVersion,
- stateTableVersion);
- table[index] = newEntry;
-
- if (table == primaryTable) {
- ++primaryTableSize;
- } else {
- ++incrementalRehashTableSize;
- }
- return newEntry;
- }
-
- /**
- * Select the sub-table which is responsible for entries with the given
hash code.
- *
- * @param hashCode the hash code which we use to decide about the table
that is responsible.
- * @return the index of the sub-table that is responsible for the entry
with the given hash code.
- */
- private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) {
- return (hashCode & (primaryTable.length - 1)) >= rehashIndex ?
primaryTable : incrementalRehashTable;
- }
-
- /**
- * Doubles the capacity of the hash table. Existing entries are placed
in
- * the correct bucket on the enlarged table. If the current capacity is,
- * MAXIMUM_CAPACITY, this method is a no-op. Returns the table, which
- * will be new unless we were already at MAXIMUM_CAPACITY.
- */
- private void doubleCapacity() {
-
- // There can only be one rehash in flight. From the amount of
incremental rehash steps we take, this should always hold.
- Preconditions.checkState(!isRehashing(), "There is already a
rehash in progress.");
-
- StateTableEntry<K, N, S>[] oldTable = primaryTable;
-
- int oldCapacity = oldTable.length;
-
- if (oldCapacity == MAXIMUM_CAPACITY) {
- return;
- }
-
- incrementalRehashTable = makeTable(oldCapacity * 2);
- }
-
- /**
- * Returns true, if an incremental rehash is in progress.
- */
- @VisibleForTesting
- boolean isRehashing() {
- // if we rehash, the secondary table is not empty
- return EMPTY_TABLE != incrementalRehashTable;
- }
-
- /**
- * Computes the hash for the composite of key and namespace and
performs some steps of incremental rehash if
- * incremental rehashing is in progress.
- */
- private int computeHashForOperationAndDoIncrementalRehash(K key, N
namespace) {
-
- checkKeyNamespacePreconditions(key, namespace);
-
- if (isRehashing()) {
- incrementalRehash();
- }
-
- return compositeHash(key, namespace);
- }
-
- /**
- * Runs a number of steps for incremental rehashing.
- */
- @SuppressWarnings("unchecked")
- private void incrementalRehash() {
-
- StateTableEntry<K, N, S>[] oldTable = primaryTable;
- StateTableEntry<K, N, S>[] newTable = incrementalRehashTable;
-
- int oldCapacity = oldTable.length;
- int newMask = newTable.length - 1;
- int requiredVersion = highestRequiredSnapshotVersion;
- int rhIdx = rehashIndex;
- int transferred = 0;
-
- // we migrate a certain minimum amount of entries from the old
to the new table
- while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
-
- StateTableEntry<K, N, S> e = oldTable[rhIdx];
-
- while (e != null) {
- // copy-on-write check for entry
- if (e.entryVersion < requiredVersion) {
- e = new StateTableEntry<>(e,
stateTableVersion);
- }
- StateTableEntry<K, N, S> n = e.next;
- int pos = e.hash & newMask;
- e.next = newTable[pos];
- newTable[pos] = e;
- e = n;
- ++transferred;
- }
-
- oldTable[rhIdx] = null;
- if (++rhIdx == oldCapacity) {
- //here, the rehash is complete and we release
resources and reset fields
- primaryTable = newTable;
- incrementalRehashTable = (StateTableEntry<K, N,
S>[]) EMPTY_TABLE;
- primaryTableSize += incrementalRehashTableSize;
- incrementalRehashTableSize = 0;
- rehashIndex = 0;
- return;
- }
- }
-
- // sync our local bookkeeping the with official bookkeeping
fields
- primaryTableSize -= transferred;
- incrementalRehashTableSize += transferred;
- rehashIndex = rhIdx;
- }
-
- /**
- * Perform copy-on-write for entry chains. We iterate the (hopefully
and probably) still cached chain, replace
- * all links up to the 'untilEntry', which we actually wanted to modify.
- */
- private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite(
- StateTableEntry<K, N, S>[] tab,
- int tableIdx,
- StateTableEntry<K, N, S> untilEntry) {
-
- final int required = highestRequiredSnapshotVersion;
-
- StateTableEntry<K, N, S> current = tab[tableIdx];
- StateTableEntry<K, N, S> copy;
-
- if (current.entryVersion < required) {
- copy = new StateTableEntry<>(current,
stateTableVersion);
- tab[tableIdx] = copy;
- } else {
- // nothing to do, just advance copy to current
- copy = current;
- }
-
- // we iterate the chain up to 'until entry'
- while (current != untilEntry) {
-
- //advance current
- current = current.next;
-
- if (current.entryVersion < required) {
- // copy and advance the current's copy
- copy.next = new StateTableEntry<>(current,
stateTableVersion);
- copy = copy.next;
- } else {
- // nothing to do, just advance copy to current
- copy = current;
- }
- }
-
- return copy;
- }
-
- @SuppressWarnings("unchecked")
- private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() {
- return (StateTableEntry<K, N, S>) ITERATOR_BOOTSTRAP_ENTRY;
- }
-
- /**
- * Helper function that creates and scrambles a composite hash for key
and namespace.
- */
- private static int compositeHash(Object key, Object namespace) {
- // create composite key through XOR, then apply some bit-mixing
for better distribution of skewed keys.
- return MathUtils.bitMix(key.hashCode() ^ namespace.hashCode());
+ protected CopyOnWriteStateMap<K, N, S> createStateMap() {
+ return new CopyOnWriteStateMap<>(getStateSerializer());
}
// Snapshotting
----------------------------------------------------------------------------------------------------
- int getStateTableVersion() {
- return stateTableVersion;
- }
-
/**
- * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be
written in checkpointing. The snapshot integrity
- * is protected through copy-on-write from the {@link
CopyOnWriteStateTable}. Users should call
- * {@link #releaseSnapshot(CopyOnWriteStateTableSnapshot)} after using
the returned object.
+ * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be
written in checkpointing.
*
* @return a snapshot from this {@link CopyOnWriteStateTable}, for
checkpointing.
*/
@Nonnull
@Override
public CopyOnWriteStateTableSnapshot<K, N, S> stateSnapshot() {
- return new CopyOnWriteStateTableSnapshot<>(this);
+ return new CopyOnWriteStateTableSnapshot<>(
+ this,
+ getKeySerializer().duplicate(),
+ getNamespaceSerializer().duplicate(),
+ getStateSerializer().duplicate(),
+
getMetaInfo().getStateSnapshotTransformFactory().createForDeserializedState().orElse(null));
}
- /**
- * Releases a snapshot for this {@link CopyOnWriteStateTable}. This
method should be called once a snapshot is no more needed,
- * so that the {@link CopyOnWriteStateTable} can stop considering this
snapshot for copy-on-write, thus avoiding unnecessary
- * object creation.
- *
- * @param snapshotToRelease the snapshot to release, which was
previously created by this state table.
- */
- void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S>
snapshotToRelease) {
-
- Preconditions.checkArgument(snapshotToRelease.isOwner(this),
- "Cannot release snapshot which is owned by a
different state table.");
-
- releaseSnapshot(snapshotToRelease.getSnapshotVersion());
- }
-
- // StateTableEntry
-------------------------------------------------------------------------------------------------
-
- /**
- * One entry in the {@link CopyOnWriteStateTable}. This is a triplet of
key, namespace, and state. Thereby, key and
- * namespace together serve as a composite key for the state. This
class also contains some management meta data for
- * copy-on-write, a pointer to link other {@link StateTableEntry}s to a
list, and cached hash code.
- *
- * @param <K> type of key.
- * @param <N> type of namespace.
- * @param <S> type of state.
- */
- @VisibleForTesting
- protected static class StateTableEntry<K, N, S> implements
StateEntry<K, N, S> {
-
- /**
- * The key. Assumed to be immutable and not null.
- */
- @Nonnull
- final K key;
-
- /**
- * The namespace. Assumed to be immutable and not null.
- */
- @Nonnull
- final N namespace;
-
- /**
- * The state. This is not final to allow exchanging the object
for copy-on-write. Can be null.
- */
- @Nullable
- S state;
-
- /**
- * Link to another {@link StateTableEntry}. This is used to
resolve collisions in the
- * {@link CopyOnWriteStateTable} through chaining.
- */
- @Nullable
- StateTableEntry<K, N, S> next;
-
- /**
- * The version of this {@link StateTableEntry}. This is meta
data for copy-on-write of the table structure.
- */
- int entryVersion;
-
- /**
- * The version of the state object in this entry. This is meta
data for copy-on-write of the state object itself.
- */
- int stateVersion;
-
- /**
- * The computed secondary hash for the composite of key and
namespace.
- */
- final int hash;
-
- StateTableEntry(StateTableEntry<K, N, S> other, int
entryVersion) {
- this(other.key, other.namespace, other.state,
other.hash, other.next, entryVersion, other.stateVersion);
- }
-
- StateTableEntry(
- @Nonnull K key,
- @Nonnull N namespace,
- @Nullable S state,
- int hash,
- @Nullable StateTableEntry<K, N, S> next,
- int entryVersion,
- int stateVersion) {
- this.key = key;
- this.namespace = namespace;
- this.hash = hash;
- this.next = next;
- this.entryVersion = entryVersion;
- this.state = state;
- this.stateVersion = stateVersion;
- }
-
- public final void setState(@Nullable S value, int mapVersion) {
- // naturally, we can update the state version every
time we replace the old state with a different object
- if (value != state) {
- this.state = value;
- this.stateVersion = mapVersion;
+ CopyOnWriteStateMapSnapshot<K, N, S>[] getStateMapSnapshotArray() {
+ CopyOnWriteStateMapSnapshot<K, N, S>[] snapshotArray =
+ new CopyOnWriteStateMapSnapshot[state.length];
Review comment:
I would suggest to suppress the warning for unchecked assignment here or, as
this place is not really performance critical, rather just use array list
instead of array.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services