lukecwik commented on a change in pull request #16057:
URL: https://github.com/apache/beam/pull/16057#discussion_r756240751



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
##########
@@ -265,48 +258,39 @@ public void prefetch() {
 
     @Override
     public boolean hasNext() {
-      switch (currentState) {
-        case EOF:
-          return false;
-        case READ_REQUIRED:
-          prefetch();
-          StateResponse stateResponse;
-          try {
-            stateResponse = prefetchedResponse.get();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IllegalStateException(e);
-          } catch (ExecutionException e) {
-            if (e.getCause() == null) {
-              throw new IllegalStateException(e);
-            }
-            Throwables.throwIfUnchecked(e.getCause());
-            throw new IllegalStateException(e.getCause());
-          }
-          prefetchedResponse = null;
-          continuationToken = stateResponse.getGet().getContinuationToken();
-          next = stateResponse.getGet().getData();
-          currentState = State.HAS_NEXT;
-          return true;
-        case HAS_NEXT:
-          return true;
-      }
-      throw new IllegalStateException(String.format("Unknown state %s", 
currentState));
+      return moreToRead;
     }
 
     @Override
     public ByteString next() {
       if (!hasNext()) {
         throw new NoSuchElementException();
       }
+
+      prefetch();
+      StateResponse stateResponse;
+      try {
+        stateResponse = prefetchedResponse.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(e);
+      } catch (ExecutionException e) {
+        if (e.getCause() == null) {
+          throw new IllegalStateException(e);
+        }
+        Throwables.throwIfUnchecked(e.getCause());
+        throw new IllegalStateException(e.getCause());
+      }
+      prefetchedResponse = null;
+      continuationToken = stateResponse.getGet().getContinuationToken();
+
       // If the continuation token is empty, that means we have reached EOF.
       if (ByteString.EMPTY.equals(continuationToken)) {
-        currentState = State.EOF;
+        moreToRead = false;
       } else {
-        currentState = State.READ_REQUIRED;
         prefetch();

Review comment:
       Yes. We want to ensure that we start fetching the next block while the 
current one is being processed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to