scwhittle commented on code in PR #29082:
URL: https://github.com/apache/beam/pull/29082#discussion_r1384859967


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -107,10 +140,26 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
       return ActivateWorkResult.EXECUTE;
     }
 
-    // Ensure we don't already have this work token queued.
-    for (Work queuedWork : workQueue) {
-      if (queuedWork.getWorkItem().getWorkToken() == 
work.getWorkItem().getWorkToken()) {
+    // Check to see if we have this work token queued.
+    Iterator<Work> workIterator = workQueue.iterator();
+    while (workIterator.hasNext()) {
+      Work queuedWork = workIterator.next();
+      if (queuedWork.id().equals(work.id())) {
         return ActivateWorkResult.DUPLICATE;
+      } else if (queuedWork.id().cacheToken() == work.id().cacheToken()) {
+        if (work.id().workToken() > queuedWork.id().workToken()) {
+          removeIfNotActive(queuedWork, workIterator, workQueue);
+          workQueue.addLast(work);

Review Comment:
   could instead continue, it might be able to remove other queued items
   
   for example if all same cache token, 
   [1 active] [2 queued]
   if 
   [3] arrives
   it would currently not remove [1] since it's active but by returning we 
aren't removing [2]



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -83,6 +85,33 @@ static ActiveWorkState forTesting(
     return new ActiveWorkState(activeWork, computationStateCache);
   }
 
+  private static Stream<KeyedGetDataRequest> toKeyedGetDataRequests(

Review Comment:
   nit: this is just moved, but since further from context now how about a 
better name capturing that this is for heartbeats
   
   makeHeartbeatKeyedGetDataRequests?
   
   



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2725,14 +2725,14 @@ public void testActiveWorkForShardedKeys() throws 
Exception {
 
     // Verify a different shard of key is a separate queue.
     Work m4 = createMockWork(3);
-    assertFalse(computationState.activateWork(key1Shard1, m4));
+    assertTrue(computationState.activateWork(key1Shard1, m4));

Review Comment:
   why did this change? it seems like it should be duplicate of m3.  Think this 
might be related to early return I commented on in code.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -147,16 +199,19 @@ private synchronized void removeCompletedWorkFromQueue(
                 () ->
                     new IllegalStateException(
                         String.format(
-                            "Active key %s without work, expected token %d",
-                            shardedKey, workToken)));
+                            "Active key %s without work, expected work_token 
%d, expected cache_token %d",

Review Comment:
   log workId instead of separate fields?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -107,10 +140,26 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
       return ActivateWorkResult.EXECUTE;
     }
 
-    // Ensure we don't already have this work token queued.
-    for (Work queuedWork : workQueue) {
-      if (queuedWork.getWorkItem().getWorkToken() == 
work.getWorkItem().getWorkToken()) {
+    // Check to see if we have this work token queued.
+    Iterator<Work> workIterator = workQueue.iterator();
+    while (workIterator.hasNext()) {
+      Work queuedWork = workIterator.next();
+      if (queuedWork.id().equals(work.id())) {
         return ActivateWorkResult.DUPLICATE;
+      } else if (queuedWork.id().cacheToken() == work.id().cacheToken()) {
+        if (work.id().workToken() > queuedWork.id().workToken()) {
+          removeIfNotActive(queuedWork, workIterator, workQueue);
+          workQueue.addLast(work);
+          return ActivateWorkResult.QUEUED;
+        } else {
+          return ActivateWorkResult.STALE;
+        }
+      } else if (queuedWork.id().workToken() == work.id().workToken()) {
+        if (queuedWork.id().cacheToken() != work.id().cacheToken()) {

Review Comment:
   this is known true from previous statement in else if chain
   
   I think the case 
   queuedWork.id().workToken() == work.id().workToken()
   could just be removed though and fall through to the bottom. If the cache 
tokens are different we're not actually sure which is the valid item in the 
backend.  And in that case it is safer to keep both since then we will 
eventually complete the right one.
   
   



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java:
##########
@@ -58,26 +57,27 @@ private static ShardedKey shardedKey(String str, long 
shardKey) {
     return ShardedKey.create(ByteString.copyFromUtf8(str), shardKey);
   }
 
-  private static Work emptyWork() {
-    return createWork(null);
-  }
-
-  private static Work createWork(@Nullable Windmill.WorkItem workItem) {
+  private static Work createWork(WorkItem workItem) {
     return Work.create(workItem, Instant::now, Collections.emptyList(), unused 
-> {});
   }
 
-  private static Work expiredWork(Windmill.WorkItem workItem) {
+  private static Work expiredWork(WorkItem workItem) {
     return Work.create(workItem, () -> Instant.EPOCH, Collections.emptyList(), 
unused -> {});
   }
 
-  private static Windmill.WorkItem createWorkItem(long workToken) {
-    return Windmill.WorkItem.newBuilder()
+  private static WorkItem createWorkItem(long workToken, long cacheToken) {
+    return WorkItem.newBuilder()
         .setKey(ByteString.copyFromUtf8(""))
         .setShardingKey(1)
         .setWorkToken(workToken)
+        .setCacheToken(cacheToken)
         .build();
   }
 
+  private static WorkId workDedupeToken(long workToken, long cacheToken) {

Review Comment:
   nit: name workId?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -83,6 +85,33 @@ static ActiveWorkState forTesting(
     return new ActiveWorkState(activeWork, computationStateCache);
   }
 
+  private static Stream<KeyedGetDataRequest> toKeyedGetDataRequests(
+      ShardedKey shardedKey, Collection<Work> workQueue, Instant 
refreshDeadline) {
+    return workQueue.stream()
+        .filter(work -> work.getStartTime().isBefore(refreshDeadline))
+        .map(
+            work ->
+                Windmill.KeyedGetDataRequest.newBuilder()
+                    .setKey(shardedKey.key())

Review Comment:
   could do separately (and possibly in other places where we make reads or 
commits) since this is just moved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to