TheNeuralBit commented on a change in pull request #15475:
URL: https://github.com/apache/beam/pull/15475#discussion_r762215539
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
##########
@@ -77,49 +121,55 @@ public void testMultiWithEmptyByteStrings() throws
Exception {
ByteString.EMPTY);
}
- private BeamFnStateClient fakeStateClient(AtomicInteger callCount,
ByteString... expected) {
- return (requestBuilder, response) -> {
- callCount.incrementAndGet();
- if (expected.length == 0) {
- response.complete(
- StateResponse.newBuilder()
- .setId(requestBuilder.getId())
- .setGet(StateGetResponse.newBuilder())
- .build());
- return;
- }
-
- ByteString continuationToken =
requestBuilder.getGet().getContinuationToken();
-
- int requestedPosition = 0; // Default position is 0
- if (!ByteString.EMPTY.equals(continuationToken)) {
- requestedPosition =
Integer.parseInt(continuationToken.toStringUtf8());
- }
-
- // Compute the new continuation token
- ByteString newContinuationToken = ByteString.EMPTY;
- if (requestedPosition != expected.length - 1) {
- newContinuationToken =
ByteString.copyFromUtf8(Integer.toString(requestedPosition + 1));
- }
- response.complete(
- StateResponse.newBuilder()
- .setId(requestBuilder.getId())
- .setGet(
- StateGetResponse.newBuilder()
- .setData(expected[requestedPosition])
- .setContinuationToken(newContinuationToken))
- .build());
- };
+ @Test
+ public void testPrefetchIgnoredWhenExistingPrefetchOngoing() throws
Exception {
+ AtomicInteger callCount = new AtomicInteger();
+ BeamFnStateClient fakeStateClient =
+ new BeamFnStateClient() {
+ @Override
+ public void handle(
+ StateRequest.Builder requestBuilder,
CompletableFuture<StateResponse> response) {
+ callCount.incrementAndGet();
+ }
+ };
+ PrefetchableIterator<ByteString> byteStrings =
+ new LazyBlockingStateFetchingIterator(fakeStateClient,
StateRequest.getDefaultInstance());
+ assertEquals(0, callCount.get());
+ byteStrings.prefetch();
+ assertEquals(1, callCount.get()); // first prefetch
+ byteStrings.prefetch();
+ assertEquals(1, callCount.get()); // subsequent is ignored
}
private void testFetch(ByteString... expected) {
AtomicInteger callCount = new AtomicInteger();
BeamFnStateClient fakeStateClient = fakeStateClient(callCount, expected);
- Iterator<ByteString> byteStrings =
+ PrefetchableIterator<ByteString> byteStrings =
new LazyBlockingStateFetchingIterator(fakeStateClient,
StateRequest.getDefaultInstance());
assertEquals(0, callCount.get()); // Ensure it's fully lazy.
- assertArrayEquals(expected, Iterators.toArray(byteStrings,
Object.class));
+ assertFalse(byteStrings.isReady());
+
+ // Prefetch every second element in the iterator capturing the results
+ List<ByteString> results = new ArrayList<>();
Review comment:
@lukecwik did you intend to make an assertion on this?
error-prone checks in https://github.com/apache/beam/pull/15890 identify it
as unused.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]