This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8be2a8ed4e0 MINOR: Add javadocs to 
AbstractMergedSortedCacheStoreIterator (#18772)
8be2a8ed4e0 is described below

commit 8be2a8ed4e0ecea9a213c5a5ee72ddb2029774ac
Author: Joao Pedro Fonseca Dantas <[email protected]>
AuthorDate: Wed Feb 5 22:20:53 2025 -0300

    MINOR: Add javadocs to AbstractMergedSortedCacheStoreIterator (#18772)
    
    While reviewing PR #18287, I wrote some javadocs to help me understand the 
AbstractMergedSortedCacheStoreIterator. Maybe we could add them to help the 
next developers getting into it.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../AbstractMergedSortedCacheStoreIterator.java    | 136 ++++++++++++++++++++-
 1 file changed, 133 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index 834b18f6491..e39257adba9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -23,16 +23,31 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import java.util.NoSuchElementException;
 
 /**
- * Merges two iterators. Assumes each of them is sorted by key
+ * AbstractMergedSortedCacheStoreIterator is an abstract class for merging two 
sorted iterators, one from a cache and
+ * the other from a store. It ensures the merged results maintain sorted order 
while resolving conflicts between cache
+ * and store entries.
  *
- * @param <K>
- * @param <V>
+ * <p>This iterator is used for state stores in Kafka Streams, which have an 
(optional) caching layer that needs to be
+ * "merged" with the underlying state. It handles common scenarios like 
skipping records with cached tombstones (deleted
+ * entries) and preferring cache entries over store entries when conflicts 
arise.</p>
+ *
+ * @param <K>  The type of the resulting merged key.
+ * @param <KS> The type of the store key.
+ * @param <V>  The type of the resulting merged value.
+ * @param <VS> The type of the store value.
  */
 abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements 
KeyValueIterator<K, V> {
     private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
     private final KeyValueIterator<KS, VS> storeIterator;
     private final boolean forward;
 
+    /**
+     * Constructs an AbstractMergedSortedCacheStoreIterator.
+     *
+     * @param cacheIterator The iterator for the cache, assumed to be sorted 
by key.
+     * @param storeIterator The iterator for the store, assumed to be sorted 
by key.
+     * @param forward       The direction of iteration. True for forward, 
false for reverse.
+     */
     AbstractMergedSortedCacheStoreIterator(final 
PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
                                            final KeyValueIterator<KS, VS> 
storeIterator,
                                            final boolean forward) {
@@ -41,20 +56,79 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, 
KS, V, VS> implements K
         this.forward = forward;
     }
 
+    /**
+     * Compares the keys from the cache and store to determine their ordering.
+     *
+     * @param cacheKey The key from the cache.
+     * @param storeKey The key from the store.
+     *
+     * @return A negative integer, zero, or a positive integer as the cache 
key is less than,
+     *     equal to, or greater than the store key.
+     */
     abstract int compare(final Bytes cacheKey, final KS storeKey);
 
+    /**
+     * Deserializes a store key into a generic merged key type.
+     *
+     * @param key The store key to deserialize.
+     *
+     * @return The deserialized key.
+     */
     abstract K deserializeStoreKey(final KS key);
 
+    /**
+     * Deserializes a key-value pair from the store into a generic merged 
key-value pair.
+     *
+     * @param pair The key-value pair from the store.
+     *
+     * @return The deserialized key-value pair.
+     */
     abstract KeyValue<K, V> deserializeStorePair(final KeyValue<KS, VS> pair);
 
+    /**
+     * Deserializes a cache key into a generic merged key type.
+     *
+     * @param cacheKey The cache key to deserialize.
+     *
+     * @return The deserialized key.
+     */
     abstract K deserializeCacheKey(final Bytes cacheKey);
 
+    /**
+     * Deserializes a cache entry into a generic value type.
+     *
+     * @param cacheEntry The cache entry to deserialize.
+     *
+     * @return The deserialized value.
+     */
     abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry);
 
+    /**
+     * Checks if a cache entry is a tombstone (representing a deleted value).
+     *
+     * @param nextFromCache The cache entry to check.
+     *
+     * @return True if the cache entry is a tombstone, false otherwise.
+     */
     private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> 
nextFromCache) {
         return nextFromCache.value.value() == null;
     }
 
+    /**
+     * Determines if there are more entries to iterate over, resolving 
conflicts between cache and store entries (e.g.,
+     * skipping tombstones).
+     *
+     * <p>Conflict resolution scenarios:</p>
+     *
+     * <ul>
+     * <li><b>Cache contains a tombstone for a key:</b> Skip both the cache 
tombstone and the corresponding store entry (if exists).</li>
+     * <li><b>Cache contains a value for a key present in the store:</b> 
Prefer the cache value and skip the store entry.</li>
+     * <li><b>Cache key is unique:</b> Return the cache value.</li>
+     * <li><b>Store key is unique:</b> Return the store value.</li>
+     * </ul>
+     *
+     * @return True if there are more entries, false otherwise.
+     */
     @Override
     public boolean hasNext() {
         // skip over items deleted from cache, and corresponding store items 
if they have the same key
@@ -86,6 +160,13 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, 
KS, V, VS> implements K
         return cacheIterator.hasNext() || storeIterator.hasNext();
     }
 
+    /**
+     * Retrieves the next key-value pair in the merged iteration.
+     *
+     * @return The next key-value pair.
+     *
+     * @throws NoSuchElementException If there are no more elements to iterate.
+     */
     @Override
     public KeyValue<K, V> next() {
         if (!hasNext()) {
@@ -107,6 +188,15 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, 
KS, V, VS> implements K
         return chooseNextValue(nextCacheKey, nextStoreKey, comparison);
     }
 
+    /**
+     * Resolves which source (cache or store) to fetch the next key-value pair 
when a comparison is performed.
+     *
+     * @param nextCacheKey The next key from the cache.
+     * @param nextStoreKey The next key from the store.
+     * @param comparison   The comparison result between the cache and store 
keys.
+     *
+     * @return The next key-value pair.
+     */
     private KeyValue<K, V> chooseNextValue(final Bytes nextCacheKey,
                                            final KS nextStoreKey,
                                            final int comparison) {
@@ -133,6 +223,15 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, 
KS, V, VS> implements K
         }
     }
 
+    /**
+     * Fetches the next value from the store, ensuring it matches the expected 
key.
+     *
+     * @param nextStoreKey The expected next key from the store.
+     *
+     * @return The next key-value pair from the store.
+     *
+     * @throws IllegalStateException If the key does not match the expected 
key.
+     */
     private KeyValue<K, V> nextStoreValue(final KS nextStoreKey) {
         final KeyValue<KS, VS> next = storeIterator.next();
 
@@ -143,6 +242,15 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, 
KS, V, VS> implements K
         return deserializeStorePair(next);
     }
 
+    /**
+     * Fetches the next value from the cache, ensuring it matches the expected 
key.
+     *
+     * @param nextCacheKey The expected next key from the cache.
+     *
+     * @return The next key-value pair from the cache.
+     *
+     * @throws IllegalStateException If the key does not match the expected 
key.
+     */
     private KeyValue<K, V> nextCacheValue(final Bytes nextCacheKey) {
         final KeyValue<Bytes, LRUCacheEntry> next = cacheIterator.next();
 
@@ -153,6 +261,13 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, 
KS, V, VS> implements K
         return KeyValue.pair(deserializeCacheKey(next.key), 
deserializeCacheValue(next.value));
     }
 
+    /**
+     * Peeks at the next key in the merged iteration without advancing the 
iterator.
+     *
+     * @return The next key in the iteration.
+     *
+     * @throws NoSuchElementException If there are no more elements to peek.
+     */
     @Override
     public K peekNextKey() {
         if (!hasNext()) {
@@ -174,6 +289,18 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, 
KS, V, VS> implements K
         return chooseNextKey(nextCacheKey, nextStoreKey, comparison);
     }
 
+    /**
+     * Determines the next key to return from the merged iteration based on 
the comparison of the cache and store keys.
+     * Resolves conflicts by considering the iteration direction and ensuring 
the merged order is maintained.
+     *
+     * @param nextCacheKey The next key from the cache.
+     * @param nextStoreKey The next key from the store.
+     * @param comparison   The comparison result between the cache and store 
keys. A negative value indicates the cache
+     *                     key is smaller, zero indicates equality, and a 
positive value indicates the store key is
+     *                     smaller.
+     *
+     * @return The next key to return from the merged iteration.
+     */
     private K chooseNextKey(final Bytes nextCacheKey,
                             final KS nextStoreKey,
                             final int comparison) {
@@ -200,6 +327,9 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, 
KS, V, VS> implements K
         }
     }
 
+    /**
+     * Closes the iterators and releases any associated resources.
+     */
     @Override
     public void close() {
         cacheIterator.close();

Reply via email to