scwhittle commented on code in PR #22461:
URL: https://github.com/apache/beam/pull/22461#discussion_r930501127
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -453,30 +453,40 @@ private <ResultT, ContinuationT>
Future<Iterable<ResultT>> valuesToPagingIterabl
public void startBatchAndBlock() {
// First, drain work out of the pending lookups into a set. These will be
the items we fetch.
HashSet<StateTag<?>> toFetch = Sets.newHashSet();
- while (!pendingLookups.isEmpty()) {
- StateTag<?> stateTag = pendingLookups.poll();
- if (stateTag == null) {
- break;
+ try {
+ while (!pendingLookups.isEmpty()) {
+ StateTag<?> stateTag = pendingLookups.poll();
+ if (stateTag == null) {
+ break;
+ }
+
+ if (!toFetch.add(stateTag)) {
+ throw new IllegalStateException("Duplicate tags being fetched.");
+ }
}
- if (!toFetch.add(stateTag)) {
- throw new IllegalStateException("Duplicate tags being fetched.");
+ // If we failed to drain anything, some other thread pulled it off the
queue. We have no work
+ // to do.
+ if (toFetch.isEmpty()) {
+ return;
}
- }
- // If we failed to drain anything, some other thread pulled it off the
queue. We have no work
- // to do.
- if (toFetch.isEmpty()) {
- return;
- }
+ Windmill.KeyedGetDataRequest request = createRequest(toFetch);
+ Windmill.KeyedGetDataResponse response =
server.getStateData(computation, request);
+ if (response == null) {
+ throw new RuntimeException("Windmill unexpectedly returned null for
request " + request);
+ }
- Windmill.KeyedGetDataRequest request = createRequest(toFetch);
- Windmill.KeyedGetDataResponse response = server.getStateData(computation,
request);
- if (response == null) {
- throw new RuntimeException("Windmill unexpectedly returned null for
request " + request);
+ // Removes tags from toFetch as they are processed.
+ consumeResponse(response, toFetch);
+ } catch (RuntimeException e) {
Review Comment:
No good reason, changed :)
I was thinking that they would only be RuntimeException since this function
wasn't otherwise marked as throwing an exception. However using Exception is
more future-proof if non-RuntimeExceptions are added down the road.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]