gemini-code-assist[bot] commented on code in PR #39085:
URL: https://github.com/apache/beam/pull/39085#discussion_r3466139826
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -196,6 +196,13 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setStuckCommitDurationMillis(int value);
+ @Description(
+ "Retry commits on stream errors until this much time has elapsed since
the commit was scheduled. If zero, retry forever.")
+ @Default.InstanceFactory(CommitWorkStreamRetryTimeoutMillisFactory.class)
+ long getCommitWorkStreamRetryTimeoutMillis();
+
+ void getCommitWorkStreamRetryTimeoutMillis(long value);
Review Comment:

The setter method is incorrectly named
`getCommitWorkStreamRetryTimeoutMillis` instead of
`setCommitWorkStreamRetryTimeoutMillis`. This violates JavaBean naming
conventions and will cause issues with `PipelineOptions` serialization and
command-line parsing.
```suggestion
void setCommitWorkStreamRetryTimeoutMillis(long value);
```
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -343,4 +350,15 @@ public Boolean create(PipelineOptions options) {
return ExperimentalOptions.hasExperiment(options,
"enable_windmill_service_direct_path");
}
}
+
+ /** defaults to false unless one of the experiment is set. */
Review Comment:

The Javadoc comment states that the option `defaults to false`, but the
factory actually returns a `Long` representing 30 minutes in milliseconds (or
`0L` if disabled). Update the comment to accurately describe the default value.
```suggestion
/** Defaults to 30 minutes unless the disable_commit_retry_timeout
experiment is set. */
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -224,14 +228,50 @@ public void onResponse(StreamingCommitResponse response) {
failureHandler.throwIfNonEmpty();
}
- @Override
@SuppressWarnings("ReferenceEquality")
+ private boolean belongsToThisHandler(StreamAndRequest streamAndRequest) {
+ return streamAndRequest.handler == this;
+ }
+
+ @Override
public boolean hasPendingRequests() {
- return pending.entrySet().stream().anyMatch(e -> e.getValue().handler ==
this);
+ return pending.entrySet().stream().anyMatch(e ->
belongsToThisHandler(e.getValue()));
}
@Override
+ @SuppressWarnings("ReferenceEquality")
public void onDone(Status status) {
+ if (maxRetryDuration.compareTo(Duration.ZERO) > 0) {
+ // Remove the requests that have exceeded the retry time so they are
not retried.
+ long startTimeRetryThresholdMsec = System.currentTimeMillis() -
maxRetryDuration.toMillis();
+ Iterator<Map.Entry<Long, StreamAndRequest>> iterator =
pending.entrySet().iterator();
+ int keptRequests = 0, removedRequests = 0;
+ while (iterator.hasNext()) {
+ StreamAndRequest streamAndRequest =
checkNotNull(iterator.next().getValue());
+ PendingRequest pendingRequest = streamAndRequest.request;
+ if (!belongsToThisHandler(streamAndRequest)
+ || pendingRequest.getStartTimeMillis() >
startTimeRetryThresholdMsec) {
+ ++keptRequests;
+ continue;
Review Comment:

Using `System.currentTimeMillis()` to measure elapsed time or timeouts is
susceptible to system clock adjustments (e.g., NTP syncs). It is highly
recommended to use a monotonic clock like `System.nanoTime()` for calculating
elapsed time.
```suggestion
long maxRetryNanos = maxRetryDuration.toNanos();
long nowNanos = System.nanoTime();
Iterator<Map.Entry<Long, StreamAndRequest>> iterator =
pending.entrySet().iterator();
int keptRequests = 0, removedRequests = 0;
while (iterator.hasNext()) {
StreamAndRequest streamAndRequest =
checkNotNull(iterator.next().getValue());
PendingRequest pendingRequest = streamAndRequest.request;
if (!belongsToThisHandler(streamAndRequest)
|| (nowNanos - pendingRequest.getStartTimeNanos()) <=
maxRetryNanos) {
++keptRequests;
continue;
}
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -224,14 +228,50 @@ public void onResponse(StreamingCommitResponse response) {
failureHandler.throwIfNonEmpty();
}
- @Override
@SuppressWarnings("ReferenceEquality")
+ private boolean belongsToThisHandler(StreamAndRequest streamAndRequest) {
+ return streamAndRequest.handler == this;
+ }
+
+ @Override
public boolean hasPendingRequests() {
- return pending.entrySet().stream().anyMatch(e -> e.getValue().handler ==
this);
+ return pending.entrySet().stream().anyMatch(e ->
belongsToThisHandler(e.getValue()));
}
@Override
+ @SuppressWarnings("ReferenceEquality")
public void onDone(Status status) {
+ if (maxRetryDuration.compareTo(Duration.ZERO) > 0) {
+ // Remove the requests that have exceeded the retry time so they are
not retried.
+ long startTimeRetryThresholdMsec = System.currentTimeMillis() -
maxRetryDuration.toMillis();
+ Iterator<Map.Entry<Long, StreamAndRequest>> iterator =
pending.entrySet().iterator();
+ int keptRequests = 0, removedRequests = 0;
+ while (iterator.hasNext()) {
+ StreamAndRequest streamAndRequest =
checkNotNull(iterator.next().getValue());
+ PendingRequest pendingRequest = streamAndRequest.request;
+ if (!belongsToThisHandler(streamAndRequest)
+ || pendingRequest.getStartTimeMillis() >
startTimeRetryThresholdMsec) {
+ ++keptRequests;
+ continue;
+ }
+ ++removedRequests;
+ try {
+ pendingRequest.completeWithStatus(CommitStatus.ABORTED);
+ } catch (RuntimeException e) {
+ // Catch possible exceptions to ensure that an exception for one
commit does not prevent
+ // other commits from being processed. Aggregate all the failures
to throw after
+ // processing the response if they exist.
Review Comment:

The comment mentions aggregating all failures to throw after processing, but
the catch block only logs a warning and does not aggregate or rethrow any
exceptions. Update the comment to reflect the actual behavior.
```suggestion
// Catch possible exceptions to ensure that an exception for one
commit does not prevent
// other commits from being processed.
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -376,34 +416,46 @@ private void issueMultiChunkRequest(long id,
PendingRequest pendingRequest)
}
}
- @AutoValue
- abstract static class PendingRequest {
+ private static class PendingRequest {
+ private final String computationId;
+ private final WorkItemCommitRequest request;
+ private final Consumer<CommitStatus> onDone;
+ private final long startTimeMillis;
- private static PendingRequest create(
+ private PendingRequest(
String computationId, WorkItemCommitRequest request,
Consumer<CommitStatus> onDone) {
- return new AutoValue_GrpcCommitWorkStream_PendingRequest(computationId,
request, onDone);
+ this.computationId = computationId;
+ this.request = request;
+ this.onDone = onDone;
+ this.startTimeMillis = System.currentTimeMillis();
}
- abstract String computationId();
+ String getComputationId() {
+ return computationId;
+ }
- abstract WorkItemCommitRequest request();
+ WorkItemCommitRequest getRequest() {
+ return request;
+ }
- abstract Consumer<CommitStatus> onDone();
+ long getStartTimeMillis() {
+ return startTimeMillis;
+ }
Review Comment:

To support monotonic time tracking using `System.nanoTime()`, update
`PendingRequest` to store and expose `startTimeNanos` instead of
`startTimeMillis`.
```java
private final long startTimeNanos;
private PendingRequest(
String computationId, WorkItemCommitRequest request,
Consumer<CommitStatus> onDone) {
this.computationId = computationId;
this.request = request;
this.onDone = onDone;
this.startTimeNanos = System.nanoTime();
}
String getComputationId() {
return computationId;
}
WorkItemCommitRequest getRequest() {
return request;
}
long getStartTimeNanos() {
return startTimeNanos;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]