scwhittle commented on code in PR #29082:
URL: https://github.com/apache/beam/pull/29082#discussion_r1366921540


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -108,9 +132,19 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
     }
 
     // Ensure we don't already have this work token queued.
-    for (Work queuedWork : workQueue) {
+    Iterator<Work> workQueueIterator = workQueue.iterator();
+    while (workQueueIterator.hasNext()) {
+      Work queuedWork = workQueueIterator.next();
       if (queuedWork.getWorkItem().getWorkToken() == 
work.getWorkItem().getWorkToken()) {
-        return ActivateWorkResult.DUPLICATE;
+        // Work is a duplicate.
+        if (queuedWork.getWorkItem().getCacheToken() == 
work.getWorkItem().getCacheToken()) {

Review Comment:
   could be cleaner to use WorkDedupKey or whatever it is renamed and call 
methods like  equals or maybe something like knownEarlierThan
   
   if the cache token is the same, we'd prefer to keep the later work item # 
since they are increasing from windmill within a worker. Otherwise we can just 
take the last observed.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -147,16 +182,22 @@ private synchronized void removeCompletedWorkFromQueue(
                 () ->
                     new IllegalStateException(
                         String.format(
-                            "Active key %s without work, expected token %d",
-                            shardedKey, workToken)));
+                            "Active key %s without work, expected work_token 
%d, expected cache_token %d",
+                            shardedKey, workDedupeKey.workToken(), 
workDedupeKey.cacheToken())));
 
-    if (completedWork.getWorkItem().getWorkToken() != workToken) {
+    if (completedWork.getWorkItem().getWorkToken() != workDedupeKey.workToken()
+        || completedWork.isRetryOf(workDedupeKey)) {
       // Work may have been completed due to clearing of stuck commits.
       LOG.warn(
-          "Unable to complete due to token mismatch for key {} and token {}, 
actual token was {}.",
+          "Unable to complete due to token mismatch for "
+              + "key {}, work_token {}, and cache_token {}; "
+              + "actual work_token was {},"
+              + "actual cache_token was {}.",
           shardedKey,
-          workToken,
-          completedWork.getWorkItem().getWorkToken());
+          workDedupeKey.workToken(),

Review Comment:
   add toString for new class and use it?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -125,6 +125,11 @@ boolean isStuckCommittingAt(Instant stuckCommitDeadline) {
         && currentState.startTime().isBefore(stuckCommitDeadline);
   }
 
+  boolean isRetryOf(WorkDedupeKey workDedupeKey) {

Review Comment:
   maybe instead put this method on WorkDedupeKey and have a method vending 
WorkDedupKey from Work?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -178,21 +219,22 @@ private synchronized Optional<Work> 
getNextWork(Queue<Work> workQueue, ShardedKe
    * before the stuckCommitDeadline.
    */
   synchronized void invalidateStuckCommits(
-      Instant stuckCommitDeadline, BiConsumer<ShardedKey, Long> 
shardedKeyAndWorkTokenConsumer) {
-    for (Entry<ShardedKey, Long> shardedKeyAndWorkToken :
+      Instant stuckCommitDeadline,
+      BiConsumer<ShardedKey, WorkDedupeKey> shardedKeyAndWorkTokenConsumer) {
+    for (Entry<ShardedKey, WorkDedupeKey> shardedKeyAndWorkToken :
         getStuckCommitsAt(stuckCommitDeadline).entrySet()) {
       ShardedKey shardedKey = shardedKeyAndWorkToken.getKey();
-      long workToken = shardedKeyAndWorkToken.getValue();
+      WorkDedupeKey workDedupeKey = shardedKeyAndWorkToken.getValue();

Review Comment:
   update variable names as type changed



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -94,6 +94,7 @@
 import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
 import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
+import org.apache.beam.runners.dataflow.worker.streaming.WorkDedupeKey;

Review Comment:
   maybe name WorkIdentifiers? WorkTokens?
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkDedupeKey.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
+
+/**
+ * A composite key used to see if a unit of {@link Work} is a duplicate. If 
multiple units of {@link
+ * Work} have the same workToken AND cacheToken, the {@link Work} is a 
duplicate. If multiple units
+ * of {@link Work} have the same workToken, but different cacheTokens, the 
{@link Work} is a retry.
+ */
+@AutoValue
+public abstract class WorkDedupeKey {
+  public static WorkDedupeKey of(WorkItem workItem) {
+    return builder()
+        .setCacheToken(workItem.getCacheToken())
+        .setWorkToken(workItem.getWorkToken())
+        .build();
+  }
+
+  public static WorkDedupeKey of(Work work) {

Review Comment:
   rm if Work changes to have method to return WorkDedupKey



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -147,16 +182,22 @@ private synchronized void removeCompletedWorkFromQueue(
                 () ->
                     new IllegalStateException(
                         String.format(
-                            "Active key %s without work, expected token %d",
-                            shardedKey, workToken)));
+                            "Active key %s without work, expected work_token 
%d, expected cache_token %d",
+                            shardedKey, workDedupeKey.workToken(), 
workDedupeKey.cacheToken())));
 
-    if (completedWork.getWorkItem().getWorkToken() != workToken) {
+    if (completedWork.getWorkItem().getWorkToken() != workDedupeKey.workToken()

Review Comment:
   use the new class and an equals method?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -83,6 +84,29 @@ static ActiveWorkState forTesting(
     return new ActiveWorkState(activeWork, computationStateCache);
   }
 
+  private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream(
+      Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue, Instant 
refreshDeadline) {

Review Comment:
   have the ShardedKey and the queue be separate parameters?
   
   also could take just an Iterable instead of Deque



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java:
##########
@@ -139,34 +168,54 @@ public void testActivateWorkForKey_QUEUED() {
   public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() {
     assertEquals(
         Optional.empty(),
-        activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey("someKey", 
1L), 10L));
+        activeWorkState.completeWorkAndGetNextWorkForKey(
+            shardedKey("someKey", 1L), workDedupeToken(1L, 1L)));
   }
 
   @Test
-  public void 
testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueDoesNotMatchWorkToComplete()
 {
-    long workTokenToComplete = 1L;
+  public void
+      
testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueWorkTokenDoesNotMatchWorkToComplete()
 {
+    long workTokenInQueue = 2L;
+    long otherWorkToken = 1L;
+    long cacheToken = 1L;
+    Work workInQueue = createWork(createWorkItem(workTokenInQueue, 
cacheToken));
+    ShardedKey shardedKey = shardedKey("someKey", 1L);
+
+    activeWorkState.activateWorkForKey(shardedKey, workInQueue);
+    activeWorkState.completeWorkAndGetNextWorkForKey(
+        shardedKey, workDedupeToken(otherWorkToken, cacheToken));
+
+    assertEquals(1, readOnlyActiveWork.get(shardedKey).size());
+    assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek());
+  }
 
-    Work workInQueue = createWork(createWorkItem(2L));
+  @Test
+  public void
+      
testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueCacheTokenDoesNotMatchWorkToComplete()
 {
+    long cacheTokenInQueue = 2L;
+    long otherCacheToken = 1L;
+    long workToken = 1L;
+    Work workInQueue = createWork(createWorkItem(workToken, 
cacheTokenInQueue));
     ShardedKey shardedKey = shardedKey("someKey", 1L);
 
     activeWorkState.activateWorkForKey(shardedKey, workInQueue);
-    activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, 
workTokenToComplete);
+    activeWorkState.completeWorkAndGetNextWorkForKey(
+        shardedKey, workDedupeToken(workToken, otherCacheToken));
 
     assertEquals(1, readOnlyActiveWork.get(shardedKey).size());
     assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek());
   }
 
   @Test
   public void 
testCompleteWorkAndGetNextWorkForKey_removesWorkFromQueueWhenComplete() {
-    long workTokenToComplete = 1L;
-
-    Work activeWork = createWork(createWorkItem(workTokenToComplete));
-    Work nextWork = createWork(createWorkItem(2L));
+    Work activeWork = createWork(createWorkItem(1L, 1L));
+    Work nextWork = createWork(createWorkItem(2L, 2L));
     ShardedKey shardedKey = shardedKey("someKey", 1L);
 
     activeWorkState.activateWorkForKey(shardedKey, activeWork);
     activeWorkState.activateWorkForKey(shardedKey, nextWork);
-    activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, 
workTokenToComplete);
+    activeWorkState.completeWorkAndGetNextWorkForKey(
+        shardedKey, WorkDedupeKey.of(activeWork.getWorkItem()));

Review Comment:
   if we add method to get dedup key on Work this could be simplified, ditto 
below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to