scwhittle commented on code in PR #34746: URL: https://github.com/apache/beam/pull/34746#discussion_r2075205935
########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java: ########## @@ -91,6 +94,82 @@ public static <T> CachingStateIterable<T> readAllAndDecodeStartingFrom( valueCoder); } + public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom( + BeamFnStateClient beamFnStateClient, + StateRequest stateRequestForFirstChunk, + Coder<T> valueCoder) { + return new UncachedStateIterable<>(beamFnStateClient, stateRequestForFirstChunk, valueCoder); + } + + static class UncachedStateIterable<T> extends PrefetchableIterables.Default<T> { Review Comment: It is used for BagUserState, MultimapState etc. https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L76 ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java: ########## @@ -91,6 +94,82 @@ public static <T> CachingStateIterable<T> readAllAndDecodeStartingFrom( valueCoder); } + public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom( Review Comment: add comment, could copy relevant info from the caching method ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java: ########## @@ -91,6 +94,82 @@ public static <T> CachingStateIterable<T> readAllAndDecodeStartingFrom( valueCoder); } + public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom( + BeamFnStateClient beamFnStateClient, + StateRequest stateRequestForFirstChunk, + Coder<T> valueCoder) { + return new UncachedStateIterable<>(beamFnStateClient, stateRequestForFirstChunk, valueCoder); + } + + static class UncachedStateIterable<T> extends PrefetchableIterables.Default<T> { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest stateRequestForFirstChunk; + private final Coder<T> valueCoder; + + public UncachedStateIterable( + BeamFnStateClient beamFnStateClient, + StateRequest stateRequestForFirstChunk, + Coder<T> valueCoder) { + this.beamFnStateClient = beamFnStateClient; + this.stateRequestForFirstChunk = stateRequestForFirstChunk; + this.valueCoder = valueCoder; + } + + @Override + public PrefetchableIterator<T> createIterator() { + return new DecodingIterator<T>( + new LazyBlockingStateFetchingIterator(beamFnStateClient, stateRequestForFirstChunk), + valueCoder); + } + + private static class DecodingIterator<T> extends AbstractIterator<T> + implements PrefetchableIterator<T> { + PrefetchableIterator<ByteString> chunkIterator; + InputStream currentChunk; + Coder<T> valueCoder; + + public DecodingIterator(PrefetchableIterator<ByteString> chunkIterator, Coder<T> valueCoder) { + this.chunkIterator = chunkIterator; + this.currentChunk = ByteString.EMPTY.newInput(); + this.valueCoder = valueCoder; + } + + @Override + protected T computeNext() { + try { + while (currentChunk.available() == 0) { + if (chunkIterator.hasNext()) { + currentChunk = chunkIterator.next().newInput(); + } else { + return endOfData(); + } + } + return valueCoder.decode(currentChunk); + } catch (IOException exn) { + // Should never get here as ByteString.newInput() returns InputStreams + // that don't do actual IO operations. + throw new IllegalStateException(exn); + } + } + + @Override + public boolean isReady() { + try { + return currentChunk.available() > 0 || chunkIterator.isReady(); + } catch (IOException exn) { + throw new IllegalStateException(exn); Review Comment: add same comment ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java: ########## @@ -91,6 +94,82 @@ public static <T> CachingStateIterable<T> readAllAndDecodeStartingFrom( valueCoder); } + public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom( + BeamFnStateClient beamFnStateClient, + StateRequest stateRequestForFirstChunk, + Coder<T> valueCoder) { + return new UncachedStateIterable<>(beamFnStateClient, stateRequestForFirstChunk, valueCoder); + } + + static class UncachedStateIterable<T> extends PrefetchableIterables.Default<T> { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest stateRequestForFirstChunk; + private final Coder<T> valueCoder; + + public UncachedStateIterable( + BeamFnStateClient beamFnStateClient, + StateRequest stateRequestForFirstChunk, + Coder<T> valueCoder) { + this.beamFnStateClient = beamFnStateClient; + this.stateRequestForFirstChunk = stateRequestForFirstChunk; + this.valueCoder = valueCoder; + } + + @Override + public PrefetchableIterator<T> createIterator() { + return new DecodingIterator<T>( + new LazyBlockingStateFetchingIterator(beamFnStateClient, stateRequestForFirstChunk), + valueCoder); + } + + private static class DecodingIterator<T> extends AbstractIterator<T> + implements PrefetchableIterator<T> { + PrefetchableIterator<ByteString> chunkIterator; Review Comment: make all members private, can make iterator and valueCoder final -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org