m-trieu commented on code in PR #30764:
URL: https://github.com/apache/beam/pull/30764#discussion_r1575749864
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -90,11 +83,11 @@ private ActiveWorkState(
}
static ActiveWorkState create(WindmillStateCache.ForComputation
computationStateCache) {
- return new ActiveWorkState(new HashMap<>(), computationStateCache);
+ return new ActiveWorkState(new ConcurrentHashMap<>(),
computationStateCache);
Review Comment:
yea this was unintended changed back
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -321,11 +291,8 @@ private synchronized ImmutableMap<ShardedKey, WorkId>
getStuckCommitsAt(
return stuckCommits.build();
}
- synchronized ImmutableList<HeartbeatRequest> getKeyHeartbeats(
- Instant refreshDeadline, DataflowExecutionStateSampler sampler) {
- return activeWork.entrySet().stream()
- .flatMap(entry -> toHeartbeatRequestStream(entry, refreshDeadline,
sampler))
- .collect(toImmutableList());
+ synchronized ImmutableMap<ShardedKey, Deque<Work>> getReadOnlyActiveWork() {
+ return ImmutableMap.copyOf(activeWork);
Review Comment:
Is it important that the refreshing is completely synchronous? how about
returning a Map<ShardedKey, List<Work>> that we build and refreshing based on
that?
that way just the creation of the map is synchronous and we can have the
active work refresher operate without locking
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]