lukecwik commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r762217256



##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -77,49 +121,55 @@ public void testMultiWithEmptyByteStrings() throws 
Exception {
           ByteString.EMPTY);
     }
 
-    private BeamFnStateClient fakeStateClient(AtomicInteger callCount, 
ByteString... expected) {
-      return (requestBuilder, response) -> {
-        callCount.incrementAndGet();
-        if (expected.length == 0) {
-          response.complete(
-              StateResponse.newBuilder()
-                  .setId(requestBuilder.getId())
-                  .setGet(StateGetResponse.newBuilder())
-                  .build());
-          return;
-        }
-
-        ByteString continuationToken = 
requestBuilder.getGet().getContinuationToken();
-
-        int requestedPosition = 0; // Default position is 0
-        if (!ByteString.EMPTY.equals(continuationToken)) {
-          requestedPosition = 
Integer.parseInt(continuationToken.toStringUtf8());
-        }
-
-        // Compute the new continuation token
-        ByteString newContinuationToken = ByteString.EMPTY;
-        if (requestedPosition != expected.length - 1) {
-          newContinuationToken = 
ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
-        }
-        response.complete(
-            StateResponse.newBuilder()
-                .setId(requestBuilder.getId())
-                .setGet(
-                    StateGetResponse.newBuilder()
-                        .setData(expected[requestedPosition])
-                        .setContinuationToken(newContinuationToken))
-                .build());
-      };
+    @Test
+    public void testPrefetchIgnoredWhenExistingPrefetchOngoing() throws 
Exception {
+      AtomicInteger callCount = new AtomicInteger();
+      BeamFnStateClient fakeStateClient =
+          new BeamFnStateClient() {
+            @Override
+            public void handle(
+                StateRequest.Builder requestBuilder, 
CompletableFuture<StateResponse> response) {
+              callCount.incrementAndGet();
+            }
+          };
+      PrefetchableIterator<ByteString> byteStrings =
+          new LazyBlockingStateFetchingIterator(fakeStateClient, 
StateRequest.getDefaultInstance());
+      assertEquals(0, callCount.get());
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // first prefetch
+      byteStrings.prefetch();
+      assertEquals(1, callCount.get()); // subsequent is ignored
     }
 
     private void testFetch(ByteString... expected) {
       AtomicInteger callCount = new AtomicInteger();
       BeamFnStateClient fakeStateClient = fakeStateClient(callCount, expected);
-      Iterator<ByteString> byteStrings =
+      PrefetchableIterator<ByteString> byteStrings =
           new LazyBlockingStateFetchingIterator(fakeStateClient, 
StateRequest.getDefaultInstance());
       assertEquals(0, callCount.get()); // Ensure it's fully lazy.
-      assertArrayEquals(expected, Iterators.toArray(byteStrings, 
Object.class));
+      assertFalse(byteStrings.isReady());
+
+      // Prefetch every second element in the iterator capturing the results
+      List<ByteString> results = new ArrayList<>();

Review comment:
       I think that was a bug in the original test and now has been fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to