scwhittle commented on code in PR #29963:
URL: https://github.com/apache/beam/pull/29963#discussion_r1457283031
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1873,14 +1915,20 @@ private void sendWorkerUpdatesToDataflowService(
*/
private void refreshActiveWork() {
Map<String, List<Windmill.KeyedGetDataRequest>> active = new HashMap<>();
+ Map<String, List<Windmill.HeartbeatRequest>> heartbeats = new HashMap<>();
Instant refreshDeadline =
clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis()));
for (Map.Entry<String, ComputationState> entry :
computationMap.entrySet()) {
- active.put(entry.getKey(),
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
+ if (windmillServiceEnabled
+ && DataflowRunner.hasExperiment(options,
"send_new_heartbeat_requests")) {
+ heartbeats.put(entry.getKey(),
entry.getValue().getKeyHeartbeats(refreshDeadline, sampler));
+ } else {
+ active.put(entry.getKey(),
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
Review Comment:
I don't want to plumb computationMap as it is rather complex. I forgot the
that KeyedGetDataRequest doesn't have the cache_token.
So I think you could switch to new heartbeat format at this level and pass
that down to rpc layer. And it can convert back to legacy heartbeats format if
it is configured to. I think that in both previous and new heartbeat formats
we don't want to send the user `key` as it can be large and cause us to exceed
the request size limit and is unnecessary in windmill. So the new proto should
have sufficient information for both cases.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1864,6 +1890,22 @@ private void sendWorkerUpdatesToDataflowService(
}
}
+ public void
handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses)
{
+ for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse :
responses) {
+ Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
+ for (Windmill.HeartbeatResponse heartbeatResponse :
+ computationHeartbeatResponse.getHeartbeatResponsesList()) {
+ failedWork.putIfAbsent(heartbeatResponse.getShardingKey(), new
ArrayList<>());
+ failedWork
+ .get(heartbeatResponse.getShardingKey())
+ .add(
+ new FailedTokens(
+ heartbeatResponse.getWorkToken(),
heartbeatResponse.getCacheToken()));
+ }
+
computationMap.get(computationHeartbeatResponse.getComputationId()).failWork(failedWork);
Review Comment:
I was more worried that the id we're looking up is from rpc response and
could be incorrect/corrupted and cause crash here. So seems better to do
lookup and ignore heartbeat response if comp is not in the map (ie null get
response).
Due to Java reference counting GC, if get returns non-null you don't have to
worry about that becoming invalid if it was removed from the map (but we also
don't remove from this map).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]