m-trieu commented on code in PR #30764:
URL: https://github.com/apache/beam/pull/30764#discussion_r1575749864


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -90,11 +83,11 @@ private ActiveWorkState(
   }
 
   static ActiveWorkState create(WindmillStateCache.ForComputation 
computationStateCache) {
-    return new ActiveWorkState(new HashMap<>(), computationStateCache);
+    return new ActiveWorkState(new ConcurrentHashMap<>(), 
computationStateCache);

Review Comment:
   yea this was unintended changed back



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -321,11 +291,8 @@ private synchronized ImmutableMap<ShardedKey, WorkId> 
getStuckCommitsAt(
     return stuckCommits.build();
   }
 
-  synchronized ImmutableList<HeartbeatRequest> getKeyHeartbeats(
-      Instant refreshDeadline, DataflowExecutionStateSampler sampler) {
-    return activeWork.entrySet().stream()
-        .flatMap(entry -> toHeartbeatRequestStream(entry, refreshDeadline, 
sampler))
-        .collect(toImmutableList());
+  synchronized ImmutableMap<ShardedKey, Deque<Work>> getReadOnlyActiveWork() {
+    return ImmutableMap.copyOf(activeWork);

Review Comment:
   Is it important that the refreshing is completely synchronous? how about 
returning a Map<ShardedKey, List<Work>> that we build and refreshing based on 
that?
   
   that way just the creation of the map is synchronous and we can have the 
active work refresher operate without locking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to