Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1950#discussion_r123981715
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/state/BaseBinaryStateIterator.java ---
    @@ -0,0 +1,162 @@
    +package org.apache.storm.state;
    +
    +import com.google.common.collect.Iterators;
    +import com.google.common.collect.PeekingIterator;
    +import com.google.common.primitives.UnsignedBytes;
    +
    +import java.util.AbstractMap;
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.NoSuchElementException;
    +import java.util.Set;
    +import java.util.TreeSet;
    +
    +/**
    + * Base implementation of iterator over {@link KeyValueState} which is 
based on binary type.
    + */
    +public abstract class BaseBinaryStateIterator<K, V> implements 
Iterator<Map.Entry<K, V>> {
    +
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> 
pendingPrepareIterator;
    +  private final PeekingIterator<Map.Entry<byte[], byte[]>> 
pendingCommitIterator;
    +  private final Set<byte[]> providedKeys;
    +
    +  private boolean firstLoad = true;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> pendingIterator;
    +  private PeekingIterator<Map.Entry<byte[], byte[]>> cachedResultIterator;
    +
    +  /**
    +   * Constructor.
    +   *
    +   * @param pendingPrepareIterator The iterator of pendingPrepare
    +   * @param pendingCommitIterator The iterator of pendingCommit
    +   */
    +  public BaseBinaryStateIterator(Iterator<Map.Entry<byte[], byte[]>> 
pendingPrepareIterator,
    +      Iterator<Map.Entry<byte[], byte[]>> pendingCommitIterator) {
    +    this.pendingPrepareIterator = 
Iterators.peekingIterator(pendingPrepareIterator);
    +    this.pendingCommitIterator = 
Iterators.peekingIterator(pendingCommitIterator);
    +    this.providedKeys = new 
TreeSet<>(UnsignedBytes.lexicographicalComparator());
    +  }
    +
    +  @Override
    +  public boolean hasNext() {
    +    if (seekToAvailableEntry(pendingPrepareIterator)) {
    +      pendingIterator = pendingPrepareIterator;
    +      return true;
    +    }
    +
    +    if (seekToAvailableEntry(pendingCommitIterator)) {
    +      pendingIterator = pendingCommitIterator;
    +      return true;
    +    }
    +
    +
    +    if (firstLoad) {
    +      // load the first part of entries
    +      fillCachedResultIterator();
    +      firstLoad = false;
    +    }
    +
    +    while (true) {
    +      if (seekToAvailableEntry(cachedResultIterator)) {
    +        pendingIterator = cachedResultIterator;
    +        return true;
    +      }
    +
    +      if (isEndOfDataFromStorage()) {
    +        break;
    +      }
    +
    +      fillCachedResultIterator();
    +    }
    +
    +    pendingIterator = null;
    +    return false;
    +  }
    +
    +  private void fillCachedResultIterator() {
    +    Iterator<Map.Entry<byte[], byte[]>> iterator = 
loadChunkFromStateStorage();
    +    if (iterator != null) {
    +      cachedResultIterator = Iterators.peekingIterator(iterator);
    +    } else {
    +      cachedResultIterator = null;
    +    }
    +  }
    +
    +  @Override
    +  public Map.Entry<K, V> next() {
    +    if (!hasNext()) {
    +      throw new NoSuchElementException();
    +    }
    +
    +    Map.Entry<byte[], byte[]> keyValue = pendingIterator.next();
    +
    +    K key = decodeKey(keyValue.getKey());
    +    V value = decodeValue(keyValue.getValue());
    +
    +    providedKeys.add(keyValue.getKey());
    +    return new AbstractMap.SimpleEntry(key, value);
    +  }
    +
    +  @Override
    +  public void remove() {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  /**
    +   * Load some part of state KVs from storage and returns iterator of 
cached data from storage.
    +   *
    +   * @return Iterator of loaded state KVs
    +   */
    +  protected abstract Iterator<Map.Entry<byte[],byte[]>> 
loadChunkFromStateStorage();
    --- End diff --
    
    Here it assumes the state implementations store in binary format? Can't it 
return `Iterator<Map.Entry<K, V>>` instead and not assume any specific 
implementation and let sub-classes do the decoding ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to