This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 63501f33968 Fix data_race condition in WindmillStreamSenderTest
(#38589)
63501f33968 is described below
commit 63501f339684d580dc38574c105dda1a10e2740b
Author: parveensania <[email protected]>
AuthorDate: Fri May 22 07:22:33 2026 -0700
Fix data_race condition in WindmillStreamSenderTest (#38589)
* Instead of memoizing Backoff constructed instance, memoize the builder
config instead
* fix indendation
* indendation fix
---
.../worker/windmill/client/grpc/GrpcWindmillStreamFactory.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
index 244d2ad3fa1..0184b88d53c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
@@ -117,13 +117,13 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
this.streamingRpcBatchLimit = streamingRpcBatchLimit;
this.windmillMessagesBetweenIsReadyChecks =
windmillMessagesBetweenIsReadyChecks;
// Configure backoff to retry calls forever, with a maximum sane retry
interval.
- this.grpcBackOff =
+ Supplier<FluentBackoff> backoffConfig =
Suppliers.memoize(
() ->
FluentBackoff.DEFAULT
.withInitialBackoff(MIN_BACKOFF)
- .withMaxBackoff(maxBackOffSupplier.get())
- .backoff());
+ .withMaxBackoff(maxBackOffSupplier.get()));
+ this.grpcBackOff = () -> backoffConfig.get().backoff();
this.streamRegistry = ConcurrentHashMap.newKeySet();
this.sendKeyedGetDataRequests = sendKeyedGetDataRequests;
this.requestBatchedGetWorkResponse = requestBatchedGetWorkResponse;