scwhittle commented on code in PR #29963:
URL: https://github.com/apache/beam/pull/29963#discussion_r1450262899


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -239,19 +240,23 @@ public Windmill.GlobalData 
getSideInputData(Windmill.GlobalDataRequest request)
   }
 
   /** Tells windmill processing is ongoing for the given keys. */

Review Comment:
   Can you update the comment? Unclear what heartbeats is



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1363,6 +1385,10 @@ private void commitLoop() {
   // Adds the commit to the commitStream if it fits, returning true iff it is 
consumed.
   private boolean addCommitToStream(Commit commit, CommitWorkStream 
commitStream) {
     Preconditions.checkNotNull(commit);
+    // Drop commits for failed work. Such commits will be dropped by Windmill 
anyway.
+    if (commit.work().isFailed()) {
+      return true;

Review Comment:
   shoudl we count to show on status page?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -120,6 +123,42 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
     return ActivateWorkResult.QUEUED;
   }
 
+  public static class FailedTokens {
+    public long workToken;
+    public long cacheToken;
+
+    public FailedTokens(long workToken, long cacheToken) {
+      this.workToken = workToken;
+      this.cacheToken = cacheToken;
+    }
+  }
+
+  synchronized void failWorkForKey(Map<Long, List<FailedTokens>> failedWork) {
+    // Note we can't construct a ShardedKey and look it up in activeWork 
directly since
+    // HeartbeatResponse doesn't include the user key.
+    for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
+      List<FailedTokens> failedTokens = 
failedWork.get(entry.getKey().shardingKey());
+      if (failedTokens == null) continue;
+      for (FailedTokens failedToken : failedTokens) {
+        for (Work queuedWork : entry.getValue()) {
+          WorkItem workItem = queuedWork.getWorkItem();
+          if (workItem.getWorkToken() == failedToken.workToken
+              && workItem.getCacheToken() == failedToken.cacheToken) {
+            LOG.error(

Review Comment:
   warning/info? users don't like errors
   
   We've also added reassuring text like, "This happens during autocaling and 
backend events, the data will be reprocessed and not lost" in other cases and 
might want somethign like that here too.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -192,11 +205,35 @@ public void refreshActiveWork(Map<String, 
List<KeyedGetDataRequest>> active) {
                 .addRequests(request));
       }
     }
+    for (Map.Entry<String, List<HeartbeatRequest>> entry : 
heartbeats.entrySet()) {
+      for (HeartbeatRequest request : entry.getValue()) {
+        // Calculate the bytes with some overhead for proto encoding.
+        long bytes = (long) entry.getKey().length() + 
request.getSerializedSize() + 10;
+        if (builderBytes > 0
+            && (builderBytes + bytes > 
AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE
+                || builder.getRequestIdCount() >= streamingRpcBatchLimit)) {

Review Comment:
   we don't need the batch limit check here. we're not adding request ids for 
heartbeats



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -192,11 +205,35 @@ public void refreshActiveWork(Map<String, 
List<KeyedGetDataRequest>> active) {
                 .addRequests(request));
       }
     }
+    for (Map.Entry<String, List<HeartbeatRequest>> entry : 
heartbeats.entrySet()) {
+      for (HeartbeatRequest request : entry.getValue()) {
+        // Calculate the bytes with some overhead for proto encoding.
+        long bytes = (long) entry.getKey().length() + 
request.getSerializedSize() + 10;
+        if (builderBytes > 0
+            && (builderBytes + bytes > 
AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE
+                || builder.getRequestIdCount() >= streamingRpcBatchLimit)) {
+          send(builder.build());
+          builderBytes = 0;
+          builder.clear();
+        }
+        builderBytes += bytes;
+        builder.addComputationHeartbeatRequest(
+            ComputationHeartbeatRequest.newBuilder()
+                .setComputationId(entry.getKey())

Review Comment:
   this adds a new computationheartbeatrequest for each heartbeat, we should 
just have one per computation with many heartbeats inside them (add test to 
catch that).



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -120,6 +123,42 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
     return ActivateWorkResult.QUEUED;
   }
 
+  public static class FailedTokens {
+    public long workToken;
+    public long cacheToken;
+
+    public FailedTokens(long workToken, long cacheToken) {
+      this.workToken = workToken;
+      this.cacheToken = cacheToken;
+    }
+  }
+
+  synchronized void failWorkForKey(Map<Long, List<FailedTokens>> failedWork) {
+    // Note we can't construct a ShardedKey and look it up in activeWork 
directly since

Review Comment:
   we could change activeMap to a multimap keyed by sharding_key.
   Lookup would have to find all entries with same sharding_key and compare 
user key, but we don't expect collisions.
   
   could also keep map keyed by sharding_key and use the existing queue.  We'd 
want to change the queueing logic to support possibly different keys though.  
But I don't think serializing procesisng in the case of such a collision is a 
real problem.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1864,6 +1890,22 @@ private void sendWorkerUpdatesToDataflowService(
     }
   }
 
+  public void 
handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses) 
{
+    for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse : 
responses) {
+      Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
+      for (Windmill.HeartbeatResponse heartbeatResponse :
+          computationHeartbeatResponse.getHeartbeatResponsesList()) {
+        failedWork.putIfAbsent(heartbeatResponse.getShardingKey(), new 
ArrayList<>());
+        failedWork
+            .get(heartbeatResponse.getShardingKey())
+            .add(
+                new FailedTokens(
+                    heartbeatResponse.getWorkToken(), 
heartbeatResponse.getCacheToken()));
+      }
+      
computationMap.get(computationHeartbeatResponse.getComputationId()).failWork(failedWork);

Review Comment:
   could get nullptr here if computaitonid isn't in the map



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -150,9 +159,13 @@ public void start(
       @Nullable Instant synchronizedProcessingTime,
       WindmillStateReader stateReader,
       SideInputStateFetcher sideInputStateFetcher,
-      Windmill.WorkItemCommitRequest.Builder outputBuilder) {
+      Windmill.WorkItemCommitRequest.Builder outputBuilder,
+      @Nullable Supplier<Boolean> workFailed) {
     this.key = key;
     this.work = work;
+    if (workFailed != null) {
+      this.workIsFailed = workFailed;
+    }

Review Comment:
   this shoudl clear the existing value in the else case, otherwise it could be 
examining previous work item failure state.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -946,6 +953,10 @@ private void process(
     final Windmill.WorkItem workItem = work.getWorkItem();
     final String computationId = computationState.getComputationId();
     final ByteString key = workItem.getKey();
+    if (work.isFailed()) {
+      LOG.debug("Not processing failed work for {}:\n{}", computationId, work);
+      return;

Review Comment:
   I think just returning will leave things in a possibly odd state. I think 
that
   computationState.completeWorkAndScheduleNextWorkForKey(
   needs to be called at least.
   
   Perhaps we could move this down to just where we would do the execution and 
throw the special exception then instead of executing. I don't think the work 
saved getting to that point is that expensive and that makes the error more 
common to existing processing errors.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -120,6 +123,42 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
     return ActivateWorkResult.QUEUED;
   }
 
+  public static class FailedTokens {
+    public long workToken;
+    public long cacheToken;
+
+    public FailedTokens(long workToken, long cacheToken) {
+      this.workToken = workToken;
+      this.cacheToken = cacheToken;
+    }
+  }
+
+  synchronized void failWorkForKey(Map<Long, List<FailedTokens>> failedWork) {
+    // Note we can't construct a ShardedKey and look it up in activeWork 
directly since
+    // HeartbeatResponse doesn't include the user key.
+    for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
+      List<FailedTokens> failedTokens = 
failedWork.get(entry.getKey().shardingKey());
+      if (failedTokens == null) continue;
+      for (FailedTokens failedToken : failedTokens) {
+        for (Work queuedWork : entry.getValue()) {
+          WorkItem workItem = queuedWork.getWorkItem();
+          if (workItem.getWorkToken() == failedToken.workToken
+              && workItem.getCacheToken() == failedToken.cacheToken) {
+            LOG.error(
+                "failing work "
+                    + entry.getKey().shardingKey()

Review Comment:
   include computation



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1864,6 +1890,22 @@ private void sendWorkerUpdatesToDataflowService(
     }
   }
 
+  public void 
handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses) 
{
+    for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse : 
responses) {
+      Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
+      for (Windmill.HeartbeatResponse heartbeatResponse :
+          computationHeartbeatResponse.getHeartbeatResponsesList()) {
+        failedWork.putIfAbsent(heartbeatResponse.getShardingKey(), new 
ArrayList<>());
+        failedWork
+            .get(heartbeatResponse.getShardingKey())

Review Comment:
   putIfAbsent returns the value so you can avoid this get



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1873,14 +1915,20 @@ private void sendWorkerUpdatesToDataflowService(
    */
   private void refreshActiveWork() {
     Map<String, List<Windmill.KeyedGetDataRequest>> active = new HashMap<>();
+    Map<String, List<Windmill.HeartbeatRequest>> heartbeats = new HashMap<>();
     Instant refreshDeadline =
         
clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis()));
 
     for (Map.Entry<String, ComputationState> entry : 
computationMap.entrySet()) {
-      active.put(entry.getKey(), 
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
+      if (windmillServiceEnabled
+          && DataflowRunner.hasExperiment(options, 
"send_new_heartbeat_requests")) {
+        heartbeats.put(entry.getKey(), 
entry.getValue().getKeyHeartbeats(refreshDeadline, sampler));
+      } else {
+        active.put(entry.getKey(), 
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));

Review Comment:
   since it is the same information, can the switch on which type of request is 
sent to windmill be moved into the actual rpc layer?  Then you could just use 
the heartbeat objects internally and covert there if neeeded.
   
   I think that woudl be simpler than the somewhat duplicated methods and 
params and keeps it more self-contained.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateReader.java:
##########
@@ -404,6 +415,9 @@ public void performReads() {
 
   private KeyedGetDataResponse tryGetDataFromWindmill(HashSet<StateTag<?>> 
stateTags)
       throws Exception {
+    if (workItemIsFailed.get()) {
+      throw new WorkItemFailedException("Windmill failed work item.");

Review Comment:
   since the exception name captures it's failed, perhaps text woudl be better 
to indicate location
   "Skipping state read as work item is no longer valid in backend"



##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -784,6 +827,17 @@ message StreamingGetDataResponse {
   repeated bytes serialized_response = 2;
   // Remaining bytes field applies only to the last serialized_response
   optional int64 remaining_bytes_for_response = 3;
+
+  // Work item fingerprints available only for complete items. So if
+  // remaining_bytes_for_response > 0 the size of farmhash_fingerprint array is
+  // one less than the size of serialized_response as the fingerprint for the
+  // last message is not yet available.
+  // Will not be populated when heartbeat_response is set.
+  repeated uint64 farmhash_fingerprint = 4;

Review Comment:
   I'd say omit it, and either reserve or just skip the #.
   Since it's matching the internal proto, I'm not really that worried someone 
will add a bad field # here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to