Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1950#discussion_r123981715 --- Diff: storm-core/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java --- @@ -0,0 +1,162 @@ +package org.apache.storm.state; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.google.common.primitives.UnsignedBytes; + +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.TreeSet; + +/** + * Base implementation of iterator over {@link KeyValueState} which is based on binary type. + */ +public abstract class BaseBinaryStateIterator<K, V> implements Iterator<Map.Entry<K, V>> { + + private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator; + private final PeekingIterator<Map.Entry<byte[], byte[]>> pendingCommitIterator; + private final Set<byte[]> providedKeys; + + private boolean firstLoad = true; + private PeekingIterator<Map.Entry<byte[], byte[]>> pendingIterator; + private PeekingIterator<Map.Entry<byte[], byte[]>> cachedResultIterator; + + /** + * Constructor. + * + * @param pendingPrepareIterator The iterator of pendingPrepare + * @param pendingCommitIterator The iterator of pendingCommit + */ + public BaseBinaryStateIterator(Iterator<Map.Entry<byte[], byte[]>> pendingPrepareIterator, + Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator) { + this.pendingPrepareIterator = Iterators.peekingIterator(pendingPrepareIterator); + this.pendingCommitIterator = Iterators.peekingIterator(pendingCommitIterator); + this.providedKeys = new TreeSet<>(UnsignedBytes.lexicographicalComparator()); + } + + @Override + public boolean hasNext() { + if (seekToAvailableEntry(pendingPrepareIterator)) { + pendingIterator = pendingPrepareIterator; + return true; + } + + if (seekToAvailableEntry(pendingCommitIterator)) { + pendingIterator = pendingCommitIterator; + return true; + } + + + if (firstLoad) { + // load the first part of entries + fillCachedResultIterator(); + firstLoad = false; + } + + while (true) { + if (seekToAvailableEntry(cachedResultIterator)) { + pendingIterator = cachedResultIterator; + return true; + } + + if (isEndOfDataFromStorage()) { + break; + } + + fillCachedResultIterator(); + } + + pendingIterator = null; + return false; + } + + private void fillCachedResultIterator() { + Iterator<Map.Entry<byte[], byte[]>> iterator = loadChunkFromStateStorage(); + if (iterator != null) { + cachedResultIterator = Iterators.peekingIterator(iterator); + } else { + cachedResultIterator = null; + } + } + + @Override + public Map.Entry<K, V> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + Map.Entry<byte[], byte[]> keyValue = pendingIterator.next(); + + K key = decodeKey(keyValue.getKey()); + V value = decodeValue(keyValue.getValue()); + + providedKeys.add(keyValue.getKey()); + return new AbstractMap.SimpleEntry(key, value); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Load some part of state KVs from storage and returns iterator of cached data from storage. + * + * @return Iterator of loaded state KVs + */ + protected abstract Iterator<Map.Entry<byte[],byte[]>> loadChunkFromStateStorage(); --- End diff -- Here it assumes the state implementations store in binary format? Can't it return `Iterator<Map.Entry<K, V>>` instead and not assume any specific implementation and let sub-classes do the decoding ?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---