scwhittle commented on code in PR #29963:
URL: https://github.com/apache/beam/pull/29963#discussion_r1457283031


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1873,14 +1915,20 @@ private void sendWorkerUpdatesToDataflowService(
    */
   private void refreshActiveWork() {
     Map<String, List<Windmill.KeyedGetDataRequest>> active = new HashMap<>();
+    Map<String, List<Windmill.HeartbeatRequest>> heartbeats = new HashMap<>();
     Instant refreshDeadline =
         
clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis()));
 
     for (Map.Entry<String, ComputationState> entry : 
computationMap.entrySet()) {
-      active.put(entry.getKey(), 
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
+      if (windmillServiceEnabled
+          && DataflowRunner.hasExperiment(options, 
"send_new_heartbeat_requests")) {
+        heartbeats.put(entry.getKey(), 
entry.getValue().getKeyHeartbeats(refreshDeadline, sampler));
+      } else {
+        active.put(entry.getKey(), 
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));

Review Comment:
   I don't want to plumb computationMap as it is rather complex. I forgot the 
that KeyedGetDataRequest doesn't have the cache_token.
   
   So I think you could switch to new heartbeat format at this level and pass 
that down to rpc layer. And it can convert back to legacy heartbeats format if 
it is configured to.  I think that in both previous and new heartbeat formats 
we don't want to send the user `key` as it can be large and cause us to exceed 
the request size limit and is unnecessary in windmill.  So the new proto should 
have sufficient information for both cases.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1864,6 +1890,22 @@ private void sendWorkerUpdatesToDataflowService(
     }
   }
 
+  public void 
handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses) 
{
+    for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse : 
responses) {
+      Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
+      for (Windmill.HeartbeatResponse heartbeatResponse :
+          computationHeartbeatResponse.getHeartbeatResponsesList()) {
+        failedWork.putIfAbsent(heartbeatResponse.getShardingKey(), new 
ArrayList<>());
+        failedWork
+            .get(heartbeatResponse.getShardingKey())
+            .add(
+                new FailedTokens(
+                    heartbeatResponse.getWorkToken(), 
heartbeatResponse.getCacheToken()));
+      }
+      
computationMap.get(computationHeartbeatResponse.getComputationId()).failWork(failedWork);

Review Comment:
   I was more worried that the id we're looking up is from rpc response and 
could be incorrect/corrupted and cause crash here.  So seems better to do 
lookup and ignore heartbeat response if comp is not in the map (ie null get 
response).
   
   Due to Java reference counting GC, if get returns non-null you don't have to 
worry about that becoming invalid if it was removed from the map (but we also 
don't remove from this map).
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to