scwhittle commented on code in PR #34746:
URL: https://github.com/apache/beam/pull/34746#discussion_r2075205935


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java:
##########
@@ -91,6 +94,82 @@ public static <T> CachingStateIterable<T> 
readAllAndDecodeStartingFrom(
         valueCoder);
   }
 
+  public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom(
+      BeamFnStateClient beamFnStateClient,
+      StateRequest stateRequestForFirstChunk,
+      Coder<T> valueCoder) {
+    return new UncachedStateIterable<>(beamFnStateClient, 
stateRequestForFirstChunk, valueCoder);
+  }
+
+  static class UncachedStateIterable<T> extends 
PrefetchableIterables.Default<T> {

Review Comment:
   It is used for BagUserState, MultimapState etc. 
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L76



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java:
##########
@@ -91,6 +94,82 @@ public static <T> CachingStateIterable<T> 
readAllAndDecodeStartingFrom(
         valueCoder);
   }
 
+  public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom(

Review Comment:
   add comment, could copy relevant info from the caching method



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java:
##########
@@ -91,6 +94,82 @@ public static <T> CachingStateIterable<T> 
readAllAndDecodeStartingFrom(
         valueCoder);
   }
 
+  public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom(
+      BeamFnStateClient beamFnStateClient,
+      StateRequest stateRequestForFirstChunk,
+      Coder<T> valueCoder) {
+    return new UncachedStateIterable<>(beamFnStateClient, 
stateRequestForFirstChunk, valueCoder);
+  }
+
+  static class UncachedStateIterable<T> extends 
PrefetchableIterables.Default<T> {
+    private final BeamFnStateClient beamFnStateClient;
+    private final StateRequest stateRequestForFirstChunk;
+    private final Coder<T> valueCoder;
+
+    public UncachedStateIterable(
+        BeamFnStateClient beamFnStateClient,
+        StateRequest stateRequestForFirstChunk,
+        Coder<T> valueCoder) {
+      this.beamFnStateClient = beamFnStateClient;
+      this.stateRequestForFirstChunk = stateRequestForFirstChunk;
+      this.valueCoder = valueCoder;
+    }
+
+    @Override
+    public PrefetchableIterator<T> createIterator() {
+      return new DecodingIterator<T>(
+          new LazyBlockingStateFetchingIterator(beamFnStateClient, 
stateRequestForFirstChunk),
+          valueCoder);
+    }
+
+    private static class DecodingIterator<T> extends AbstractIterator<T>
+        implements PrefetchableIterator<T> {
+      PrefetchableIterator<ByteString> chunkIterator;
+      InputStream currentChunk;
+      Coder<T> valueCoder;
+
+      public DecodingIterator(PrefetchableIterator<ByteString> chunkIterator, 
Coder<T> valueCoder) {
+        this.chunkIterator = chunkIterator;
+        this.currentChunk = ByteString.EMPTY.newInput();
+        this.valueCoder = valueCoder;
+      }
+
+      @Override
+      protected T computeNext() {
+        try {
+          while (currentChunk.available() == 0) {
+            if (chunkIterator.hasNext()) {
+              currentChunk = chunkIterator.next().newInput();
+            } else {
+              return endOfData();
+            }
+          }
+          return valueCoder.decode(currentChunk);
+        } catch (IOException exn) {
+          // Should never get here as ByteString.newInput() returns 
InputStreams
+          // that don't do actual IO operations.
+          throw new IllegalStateException(exn);
+        }
+      }
+
+      @Override
+      public boolean isReady() {
+        try {
+          return currentChunk.available() > 0 || chunkIterator.isReady();
+        } catch (IOException exn) {
+          throw new IllegalStateException(exn);

Review Comment:
   add same comment



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java:
##########
@@ -91,6 +94,82 @@ public static <T> CachingStateIterable<T> 
readAllAndDecodeStartingFrom(
         valueCoder);
   }
 
+  public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom(
+      BeamFnStateClient beamFnStateClient,
+      StateRequest stateRequestForFirstChunk,
+      Coder<T> valueCoder) {
+    return new UncachedStateIterable<>(beamFnStateClient, 
stateRequestForFirstChunk, valueCoder);
+  }
+
+  static class UncachedStateIterable<T> extends 
PrefetchableIterables.Default<T> {
+    private final BeamFnStateClient beamFnStateClient;
+    private final StateRequest stateRequestForFirstChunk;
+    private final Coder<T> valueCoder;
+
+    public UncachedStateIterable(
+        BeamFnStateClient beamFnStateClient,
+        StateRequest stateRequestForFirstChunk,
+        Coder<T> valueCoder) {
+      this.beamFnStateClient = beamFnStateClient;
+      this.stateRequestForFirstChunk = stateRequestForFirstChunk;
+      this.valueCoder = valueCoder;
+    }
+
+    @Override
+    public PrefetchableIterator<T> createIterator() {
+      return new DecodingIterator<T>(
+          new LazyBlockingStateFetchingIterator(beamFnStateClient, 
stateRequestForFirstChunk),
+          valueCoder);
+    }
+
+    private static class DecodingIterator<T> extends AbstractIterator<T>
+        implements PrefetchableIterator<T> {
+      PrefetchableIterator<ByteString> chunkIterator;

Review Comment:
   make all members private, can make iterator and valueCoder final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to