This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8be2a8ed4e0 MINOR: Add javadocs to
AbstractMergedSortedCacheStoreIterator (#18772)
8be2a8ed4e0 is described below
commit 8be2a8ed4e0ecea9a213c5a5ee72ddb2029774ac
Author: Joao Pedro Fonseca Dantas <[email protected]>
AuthorDate: Wed Feb 5 22:20:53 2025 -0300
MINOR: Add javadocs to AbstractMergedSortedCacheStoreIterator (#18772)
While reviewing PR #18287, I wrote some javadocs to help me understand the
AbstractMergedSortedCacheStoreIterator. Maybe we could add them to help the
next developers getting into it.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../AbstractMergedSortedCacheStoreIterator.java | 136 ++++++++++++++++++++-
1 file changed, 133 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index 834b18f6491..e39257adba9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -23,16 +23,31 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import java.util.NoSuchElementException;
/**
- * Merges two iterators. Assumes each of them is sorted by key
+ * AbstractMergedSortedCacheStoreIterator is an abstract class for merging two
sorted iterators, one from a cache and
+ * the other from a store. It ensures the merged results maintain sorted order
while resolving conflicts between cache
+ * and store entries.
*
- * @param <K>
- * @param <V>
+ * <p>This iterator is used for state stores in Kafka Streams, which have an
(optional) caching layer that needs to be
+ * "merged" with the underlying state. It handles common scenarios like
skipping records with cached tombstones (deleted
+ * entries) and preferring cache entries over store entries when conflicts
arise.</p>
+ *
+ * @param <K> The type of the resulting merged key.
+ * @param <KS> The type of the store key.
+ * @param <V> The type of the resulting merged value.
+ * @param <VS> The type of the store value.
*/
abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements
KeyValueIterator<K, V> {
private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
private final KeyValueIterator<KS, VS> storeIterator;
private final boolean forward;
+ /**
+ * Constructs an AbstractMergedSortedCacheStoreIterator.
+ *
+ * @param cacheIterator The iterator for the cache, assumed to be sorted
by key.
+ * @param storeIterator The iterator for the store, assumed to be sorted
by key.
+ * @param forward The direction of iteration. True for forward,
false for reverse.
+ */
AbstractMergedSortedCacheStoreIterator(final
PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final KeyValueIterator<KS, VS>
storeIterator,
final boolean forward) {
@@ -41,20 +56,79 @@ abstract class AbstractMergedSortedCacheStoreIterator<K,
KS, V, VS> implements K
this.forward = forward;
}
+ /**
+ * Compares the keys from the cache and store to determine their ordering.
+ *
+ * @param cacheKey The key from the cache.
+ * @param storeKey The key from the store.
+ *
+ * @return A negative integer, zero, or a positive integer as the cache
key is less than,
+ * equal to, or greater than the store key.
+ */
abstract int compare(final Bytes cacheKey, final KS storeKey);
+ /**
+ * Deserializes a store key into a generic merged key type.
+ *
+ * @param key The store key to deserialize.
+ *
+ * @return The deserialized key.
+ */
abstract K deserializeStoreKey(final KS key);
+ /**
+ * Deserializes a key-value pair from the store into a generic merged
key-value pair.
+ *
+ * @param pair The key-value pair from the store.
+ *
+ * @return The deserialized key-value pair.
+ */
abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, VS> pair);
+ /**
+ * Deserializes a cache key into a generic merged key type.
+ *
+ * @param cacheKey The cache key to deserialize.
+ *
+ * @return The deserialized key.
+ */
abstract K deserializeCacheKey(final Bytes cacheKey);
+ /**
+ * Deserializes a cache entry into a generic value type.
+ *
+ * @param cacheEntry The cache entry to deserialize.
+ *
+ * @return The deserialized value.
+ */
abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry);
+ /**
+ * Checks if a cache entry is a tombstone (representing a deleted value).
+ *
+ * @param nextFromCache The cache entry to check.
+ *
+ * @return True if the cache entry is a tombstone, false otherwise.
+ */
private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry>
nextFromCache) {
return nextFromCache.value.value() == null;
}
+ /**
+ * Determines if there are more entries to iterate over, resolving
conflicts between cache and store entries (e.g.,
+ * skipping tombstones).
+ *
+ * <p>Conflict resolution scenarios:</p>
+ *
+ * <ul>
+ * <li><b>Cache contains a tombstone for a key:</b> Skip both the cache
tombstone and the corresponding store entry (if exists).</li>
+ * <li><b>Cache contains a value for a key present in the store:</b>
Prefer the cache value and skip the store entry.</li>
+ * <li><b>Cache key is unique:</b> Return the cache value.</li>
+ * <li><b>Store key is unique:</b> Return the store value.</li>
+ * </ul>
+ *
+ * @return True if there are more entries, false otherwise.
+ */
@Override
public boolean hasNext() {
// skip over items deleted from cache, and corresponding store items
if they have the same key
@@ -86,6 +160,13 @@ abstract class AbstractMergedSortedCacheStoreIterator<K,
KS, V, VS> implements K
return cacheIterator.hasNext() || storeIterator.hasNext();
}
+ /**
+ * Retrieves the next key-value pair in the merged iteration.
+ *
+ * @return The next key-value pair.
+ *
+ * @throws NoSuchElementException If there are no more elements to iterate.
+ */
@Override
public KeyValue<K, V> next() {
if (!hasNext()) {
@@ -107,6 +188,15 @@ abstract class AbstractMergedSortedCacheStoreIterator<K,
KS, V, VS> implements K
return chooseNextValue(nextCacheKey, nextStoreKey, comparison);
}
+ /**
+ * Resolves which source (cache or store) to fetch the next key-value pair
when a comparison is performed.
+ *
+ * @param nextCacheKey The next key from the cache.
+ * @param nextStoreKey The next key from the store.
+ * @param comparison The comparison result between the cache and store
keys.
+ *
+ * @return The next key-value pair.
+ */
private KeyValue<K, V> chooseNextValue(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
@@ -133,6 +223,15 @@ abstract class AbstractMergedSortedCacheStoreIterator<K,
KS, V, VS> implements K
}
}
+ /**
+ * Fetches the next value from the store, ensuring it matches the expected
key.
+ *
+ * @param nextStoreKey The expected next key from the store.
+ *
+ * @return The next key-value pair from the store.
+ *
+ * @throws IllegalStateException If the key does not match the expected
key.
+ */
private KeyValue<K, V> nextStoreValue(final KS nextStoreKey) {
final KeyValue<KS, VS> next = storeIterator.next();
@@ -143,6 +242,15 @@ abstract class AbstractMergedSortedCacheStoreIterator<K,
KS, V, VS> implements K
return deserializeStorePair(next);
}
+ /**
+ * Fetches the next value from the cache, ensuring it matches the expected
key.
+ *
+ * @param nextCacheKey The expected next key from the cache.
+ *
+ * @return The next key-value pair from the cache.
+ *
+ * @throws IllegalStateException If the key does not match the expected
key.
+ */
private KeyValue<K, V> nextCacheValue(final Bytes nextCacheKey) {
final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();
@@ -153,6 +261,13 @@ abstract class AbstractMergedSortedCacheStoreIterator<K,
KS, V, VS> implements K
return KeyValue.pair(deserializeCacheKey(next.key),
deserializeCacheValue(next.value));
}
+ /**
+ * Peeks at the next key in the merged iteration without advancing the
iterator.
+ *
+ * @return The next key in the iteration.
+ *
+ * @throws NoSuchElementException If there are no more elements to peek.
+ */
@Override
public K peekNextKey() {
if (!hasNext()) {
@@ -174,6 +289,18 @@ abstract class AbstractMergedSortedCacheStoreIterator<K,
KS, V, VS> implements K
return chooseNextKey(nextCacheKey, nextStoreKey, comparison);
}
+ /**
+ * Determines the next key to return from the merged iteration based on
the comparison of the cache and store keys.
+ * Resolves conflicts by considering the iteration direction and ensuring
the merged order is maintained.
+ *
+ * @param nextCacheKey The next key from the cache.
+ * @param nextStoreKey The next key from the store.
+ * @param comparison The comparison result between the cache and store
keys. A negative value indicates the cache
+ * key is smaller, zero indicates equality, and a
positive value indicates the store key is
+ * smaller.
+ *
+ * @return The next key to return from the merged iteration.
+ */
private K chooseNextKey(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
@@ -200,6 +327,9 @@ abstract class AbstractMergedSortedCacheStoreIterator<K,
KS, V, VS> implements K
}
}
+ /**
+ * Closes the iterators and releases any associated resources.
+ */
@Override
public void close() {
cacheIterator.close();