acrites commented on code in PR #29963:
URL: https://github.com/apache/beam/pull/29963#discussion_r1456452871
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1873,14 +1915,20 @@ private void sendWorkerUpdatesToDataflowService(
*/
private void refreshActiveWork() {
Map<String, List<Windmill.KeyedGetDataRequest>> active = new HashMap<>();
+ Map<String, List<Windmill.HeartbeatRequest>> heartbeats = new HashMap<>();
Instant refreshDeadline =
clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis()));
for (Map.Entry<String, ComputationState> entry :
computationMap.entrySet()) {
- active.put(entry.getKey(),
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
+ if (windmillServiceEnabled
+ && DataflowRunner.hasExperiment(options,
"send_new_heartbeat_requests")) {
+ heartbeats.put(entry.getKey(),
entry.getValue().getKeyHeartbeats(refreshDeadline, sampler));
+ } else {
+ active.put(entry.getKey(),
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
Review Comment:
Are you suggesting we plumb the `computationMap` down into the RPC layer
instead? Otherwise, I could build a map of consisting of a new type of object
(maybe ComputationGetDataRequest since that contains both types of heartbeats
or maybe a new object entirely) and pass that down into the RPC layer. In one
case we need the user key and in the other we need the cache token, but the
rest of the data is the same between the two.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]