gemini-code-assist[bot] commented on code in PR #39085:
URL: https://github.com/apache/beam/pull/39085#discussion_r3466849490
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -224,14 +228,48 @@ public void onResponse(StreamingCommitResponse response) {
failureHandler.throwIfNonEmpty();
}
- @Override
@SuppressWarnings("ReferenceEquality")
+ private boolean belongsToThisHandler(StreamAndRequest streamAndRequest) {
+ return streamAndRequest.handler == this;
+ }
+
+ @Override
public boolean hasPendingRequests() {
- return pending.entrySet().stream().anyMatch(e -> e.getValue().handler ==
this);
+ return pending.entrySet().stream().anyMatch(e ->
belongsToThisHandler(e.getValue()));
}
@Override
+ @SuppressWarnings("ReferenceEquality")
public void onDone(Status status) {
+ if (maxRetryDuration.compareTo(Duration.ZERO) > 0) {
+ // Remove the requests that have exceeded the retry time so they are
not retried.
+ long startTimeRetryThresholdNanos = System.nanoTime() -
maxRetryDuration.toNanos();
+ Iterator<Map.Entry<Long, StreamAndRequest>> iterator =
pending.entrySet().iterator();
+ int keptRequests = 0, removedRequests = 0;
+ while (iterator.hasNext()) {
+ StreamAndRequest streamAndRequest =
checkNotNull(iterator.next().getValue());
+ PendingRequest pendingRequest = streamAndRequest.request;
+ if (!belongsToThisHandler(streamAndRequest)
+ || pendingRequest.getStartTimeNanos() >
startTimeRetryThresholdNanos) {
+ ++keptRequests;
+ continue;
+ }
+ ++removedRequests;
+ try {
+ pendingRequest.completeWithStatus(CommitStatus.ABORTED);
+ iterator.remove();
+ } catch (RuntimeException e) {
+ // Ignore exceptions and retry the commit.
+ LOG.warn("Exception while aborting commit due to retry timeout.",
e);
+ }
+ }
Review Comment:

There are two issues in this retry timeout cleanup block:
1. **Robustness & Memory Leak**: If
`pendingRequest.completeWithStatus(CommitStatus.ABORTED)` throws a
`RuntimeException`, `iterator.remove()` is skipped. This leaves the timed-out
request in the `pending` map, causing it to be retried on stream reconnect and
potentially leaking memory. We should remove the request from the iterator
first before invoking the callback.
2. **`System.nanoTime()` Comparison Safety**: Comparing absolute `nanoTime`
values directly using `>` can fail if the values wrap around (which can happen
on long-running JVMs). Using `System.nanoTime() -
pendingRequest.getStartTimeNanos() < maxRetryDurationNanos` is safe against
wrap-around and is also much more readable as it directly represents the
elapsed time.
```java
if (maxRetryDuration.compareTo(Duration.ZERO) > 0) {
// Remove the requests that have exceeded the retry time so they are
not retried.
long maxRetryDurationNanos = maxRetryDuration.toNanos();
Iterator<Map.Entry<Long, StreamAndRequest>> iterator =
pending.entrySet().iterator();
int keptRequests = 0, removedRequests = 0;
while (iterator.hasNext()) {
StreamAndRequest streamAndRequest =
checkNotNull(iterator.next().getValue());
PendingRequest pendingRequest = streamAndRequest.request;
if (!belongsToThisHandler(streamAndRequest)
|| System.nanoTime() - pendingRequest.getStartTimeNanos() <
maxRetryDurationNanos) {
++keptRequests;
continue;
}
++removedRequests;
iterator.remove();
try {
pendingRequest.completeWithStatus(CommitStatus.ABORTED);
} catch (RuntimeException e) {
LOG.warn("Exception while aborting commit due to retry
timeout.", e);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]