gemini-code-assist[bot] commented on code in PR #39085:
URL: https://github.com/apache/beam/pull/39085#discussion_r3466903562
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -224,14 +228,49 @@ public void onResponse(StreamingCommitResponse response) {
failureHandler.throwIfNonEmpty();
}
- @Override
@SuppressWarnings("ReferenceEquality")
+ private boolean belongsToThisHandler(StreamAndRequest streamAndRequest) {
+ return streamAndRequest.handler == this;
+ }
+
+ @Override
public boolean hasPendingRequests() {
- return pending.entrySet().stream().anyMatch(e -> e.getValue().handler ==
this);
+ return pending.entrySet().stream().anyMatch(e ->
belongsToThisHandler(e.getValue()));
}
@Override
+ @SuppressWarnings("ReferenceEquality")
public void onDone(Status status) {
+ if (maxRetryDuration.compareTo(Duration.ZERO) > 0) {
+ // Remove the requests that have exceeded the retry time so they are
not retried.
+ long nowNanos = System.nanoTime();
+ long maxRetryDurationNanos = maxRetryDuration.toNanos();
+ Iterator<Map.Entry<Long, StreamAndRequest>> iterator =
pending.entrySet().iterator();
+ int keptRequests = 0, removedRequests = 0;
+ while (iterator.hasNext()) {
+ StreamAndRequest streamAndRequest =
checkNotNull(iterator.next().getValue());
+ PendingRequest pendingRequest = streamAndRequest.request;
+ if (!belongsToThisHandler(streamAndRequest)
+ || nowNanos - pendingRequest.getStartTimeNanos() <
maxRetryDurationNanos) {
+ ++keptRequests;
+ continue;
+ }
+ ++removedRequests;
+ iterator.remove();
+ try {
+ pendingRequest.completeWithStatus(CommitStatus.ABORTED);
+ } catch (RuntimeException e) {
+ // Ignore exceptions and retry the commit.
+ LOG.warn("Exception while aborting commit due to retry timeout.",
e);
+ }
Review Comment:

The comment `// Ignore exceptions and retry the commit.` is misleading.
Since `iterator.remove()` was already called on line 259, the commit has been
removed from the `pending` map and will **not** be retried. The comment should
be updated to clarify that we are ignoring the exception and continuing to
abort other expired commits.
```suggestion
} catch (RuntimeException e) {
// Ignore exceptions and continue aborting other expired commits.
LOG.warn("Exception while aborting commit due to retry
timeout.", e);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]