reuvenlax commented on code in PR #22461:
URL: https://github.com/apache/beam/pull/22461#discussion_r930496722


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -453,30 +453,40 @@ private <ResultT, ContinuationT> 
Future<Iterable<ResultT>> valuesToPagingIterabl
   public void startBatchAndBlock() {
     // First, drain work out of the pending lookups into a set. These will be 
the items we fetch.
     HashSet<StateTag<?>> toFetch = Sets.newHashSet();
-    while (!pendingLookups.isEmpty()) {
-      StateTag<?> stateTag = pendingLookups.poll();
-      if (stateTag == null) {
-        break;
+    try {
+      while (!pendingLookups.isEmpty()) {
+        StateTag<?> stateTag = pendingLookups.poll();
+        if (stateTag == null) {
+          break;
+        }
+
+        if (!toFetch.add(stateTag)) {
+          throw new IllegalStateException("Duplicate tags being fetched.");
+        }
       }
 
-      if (!toFetch.add(stateTag)) {
-        throw new IllegalStateException("Duplicate tags being fetched.");
+      // If we failed to drain anything, some other thread pulled it off the 
queue. We have no work
+      // to do.
+      if (toFetch.isEmpty()) {
+        return;
       }
-    }
 
-    // If we failed to drain anything, some other thread pulled it off the 
queue. We have no work
-    // to do.
-    if (toFetch.isEmpty()) {
-      return;
-    }
+      Windmill.KeyedGetDataRequest request = createRequest(toFetch);
+      Windmill.KeyedGetDataResponse response = 
server.getStateData(computation, request);
+      if (response == null) {
+        throw new RuntimeException("Windmill unexpectedly returned null for 
request " + request);
+      }
 
-    Windmill.KeyedGetDataRequest request = createRequest(toFetch);
-    Windmill.KeyedGetDataResponse response = server.getStateData(computation, 
request);
-    if (response == null) {
-      throw new RuntimeException("Windmill unexpectedly returned null for 
request " + request);
+      // Removes tags from toFetch as they are processed.
+      consumeResponse(response, toFetch);
+    } catch (RuntimeException e) {

Review Comment:
   Why are you catching RuntimeException and not Exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to