reuvenlax commented on code in PR #22461:
URL: https://github.com/apache/beam/pull/22461#discussion_r930496722
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -453,30 +453,40 @@ private <ResultT, ContinuationT>
Future<Iterable<ResultT>> valuesToPagingIterabl
public void startBatchAndBlock() {
// First, drain work out of the pending lookups into a set. These will be
the items we fetch.
HashSet<StateTag<?>> toFetch = Sets.newHashSet();
- while (!pendingLookups.isEmpty()) {
- StateTag<?> stateTag = pendingLookups.poll();
- if (stateTag == null) {
- break;
+ try {
+ while (!pendingLookups.isEmpty()) {
+ StateTag<?> stateTag = pendingLookups.poll();
+ if (stateTag == null) {
+ break;
+ }
+
+ if (!toFetch.add(stateTag)) {
+ throw new IllegalStateException("Duplicate tags being fetched.");
+ }
}
- if (!toFetch.add(stateTag)) {
- throw new IllegalStateException("Duplicate tags being fetched.");
+ // If we failed to drain anything, some other thread pulled it off the
queue. We have no work
+ // to do.
+ if (toFetch.isEmpty()) {
+ return;
}
- }
- // If we failed to drain anything, some other thread pulled it off the
queue. We have no work
- // to do.
- if (toFetch.isEmpty()) {
- return;
- }
+ Windmill.KeyedGetDataRequest request = createRequest(toFetch);
+ Windmill.KeyedGetDataResponse response =
server.getStateData(computation, request);
+ if (response == null) {
+ throw new RuntimeException("Windmill unexpectedly returned null for
request " + request);
+ }
- Windmill.KeyedGetDataRequest request = createRequest(toFetch);
- Windmill.KeyedGetDataResponse response = server.getStateData(computation,
request);
- if (response == null) {
- throw new RuntimeException("Windmill unexpectedly returned null for
request " + request);
+ // Removes tags from toFetch as they are processed.
+ consumeResponse(response, toFetch);
+ } catch (RuntimeException e) {
Review Comment:
Why are you catching RuntimeException and not Exception?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]