This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e6e9b83 Prefetch subsequent pages over the FnAPI. (#14803)
e6e9b83 is described below
commit e6e9b83d39822344d9259a22d8dbcf117fcf024e
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Jun 18 10:15:57 2021 -0700
Prefetch subsequent pages over the FnAPI. (#14803)
We still want to be lazy when the object is created, as we don't
know if we'll want to iterate over it at all, but once we've
started iterating in most cases we'll want to read the entire
iterable and it makes sense to issue the state requests concurrent
to processing the current page.
---
.../fn/harness/state/StateFetchingIterators.java | 33 +++++++++++++++-------
.../harness/state/StateFetchingIteratorsTest.java | 9 +++++-
2 files changed, 31 insertions(+), 11 deletions(-)
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
index dbca4a0..79a4297 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
@@ -173,8 +173,8 @@ public class StateFetchingIterators {
/**
* An {@link Iterator} which fetches {@link ByteString} chunks using the
State API.
*
- * <p>This iterator will only request a chunk on first access. Also it does
not eagerly pre-fetch
- * any future chunks and blocks whenever required to fetch the next block.
+ * <p>This iterator will only request a chunk on first access. Subsiquently
it eagerly pre-fetches
+ * one future chunks at a time.
*/
static class LazyBlockingStateFetchingIterator implements
Iterator<ByteString> {
@@ -189,6 +189,7 @@ public class StateFetchingIterators {
private State currentState;
private ByteString continuationToken;
private ByteString next;
+ private CompletableFuture<StateResponse> prefetchedResponse;
LazyBlockingStateFetchingIterator(
BeamFnStateClient beamFnStateClient, StateRequest
stateRequestForFirstChunk) {
@@ -198,21 +199,27 @@ public class StateFetchingIterators {
this.continuationToken =
stateRequestForFirstChunk.getGet().getContinuationToken();
}
+ private void prefetch() {
+ if (prefetchedResponse == null && currentState == State.READ_REQUIRED) {
+ prefetchedResponse = new CompletableFuture<>();
+ beamFnStateClient.handle(
+ stateRequestForFirstChunk
+ .toBuilder()
+
.setGet(StateGetRequest.newBuilder().setContinuationToken(continuationToken)),
+ prefetchedResponse);
+ }
+ }
+
@Override
public boolean hasNext() {
switch (currentState) {
case EOF:
return false;
case READ_REQUIRED:
- CompletableFuture<StateResponse> stateResponseFuture = new
CompletableFuture<>();
- beamFnStateClient.handle(
- stateRequestForFirstChunk
- .toBuilder()
-
.setGet(StateGetRequest.newBuilder().setContinuationToken(continuationToken)),
- stateResponseFuture);
+ prefetch();
StateResponse stateResponse;
try {
- stateResponse = stateResponseFuture.get();
+ stateResponse = prefetchedResponse.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
@@ -223,6 +230,7 @@ public class StateFetchingIterators {
Throwables.throwIfUnchecked(e.getCause());
throw new IllegalStateException(e.getCause());
}
+ prefetchedResponse = null;
continuationToken = stateResponse.getGet().getContinuationToken();
next = stateResponse.getGet().getData();
currentState = State.HAS_NEXT;
@@ -239,7 +247,12 @@ public class StateFetchingIterators {
throw new NoSuchElementException();
}
// If the continuation token is empty, that means we have reached EOF.
- currentState = ByteString.EMPTY.equals(continuationToken) ? State.EOF :
State.READ_REQUIRED;
+ if (ByteString.EMPTY.equals(continuationToken)) {
+ currentState = State.EOF;
+ } else {
+ currentState = State.READ_REQUIRED;
+ prefetch();
+ }
return next;
}
}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
index b4f0528..fc729cc 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
@@ -162,7 +162,7 @@ public class StateFetchingIteratorsTest {
Iterator<T> valuesIter = values.iterator();
assertEquals(0, callCount.get());
- // No more is read than necissary.
+ // No more is read than necessary.
if (valuesIter.hasNext()) {
valuesIter.next();
}
@@ -176,6 +176,13 @@ public class StateFetchingIteratorsTest {
}
assertEquals(1, callCount.get());
+ if (valuesIter.hasNext()) {
+ valuesIter.next();
+ // Subsequent pages are pre-fetched, so after accessing the second
page,
+ // the third should be requested.
+ assertEquals(3, callCount.get());
+ }
+
// The contents agree.
assertArrayEquals(expected, Iterables.toArray(values, Object.class));
}