scwhittle commented on code in PR #29082:
URL: https://github.com/apache/beam/pull/29082#discussion_r1369967563


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -126,37 +164,40 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
    * #activeWork}.
    */
   synchronized Optional<Work> completeWorkAndGetNextWorkForKey(
-      ShardedKey shardedKey, long workToken) {
+      ShardedKey shardedKey, WorkId workId) {
     @Nullable Queue<Work> workQueue = activeWork.get(shardedKey);
     if (workQueue == null) {
       // Work may have been completed due to clearing of stuck commits.
-      LOG.warn("Unable to complete inactive work for key {} and token {}.", 
shardedKey, workToken);
+      LOG.warn("Unable to complete inactive work for key {} and token {}.", 
shardedKey, workId);
       return Optional.empty();
     }
-    removeCompletedWorkFromQueue(workQueue, shardedKey, workToken);
+    removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
     return getNextWork(workQueue, shardedKey);
   }
 
   private synchronized void removeCompletedWorkFromQueue(
-      Queue<Work> workQueue, ShardedKey shardedKey, long workToken) {
+      Queue<Work> workQueue, ShardedKey shardedKey, WorkId workId) {
     // avoid Preconditions.checkState here to prevent eagerly evaluating the
     // format string parameters for the error message.
-    Work completedWork =
+    Work workToComplete =
         Optional.ofNullable(workQueue.peek())
             .orElseThrow(
                 () ->
                     new IllegalStateException(
                         String.format(
-                            "Active key %s without work, expected token %d",
-                            shardedKey, workToken)));
+                            "Active key %s without work, expected work_token 
%d, expected cache_token %d",
+                            shardedKey, workId.workToken(), 
workId.cacheToken())));
 
-    if (completedWork.getWorkItem().getWorkToken() != workToken) {
+    if (!workToComplete.id().isForSameWork(workId) || 
workToComplete.id().isRetryOf(workId)) {

Review Comment:
   just use !equals?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -126,37 +164,40 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
    * #activeWork}.
    */
   synchronized Optional<Work> completeWorkAndGetNextWorkForKey(
-      ShardedKey shardedKey, long workToken) {
+      ShardedKey shardedKey, WorkId workId) {
     @Nullable Queue<Work> workQueue = activeWork.get(shardedKey);
     if (workQueue == null) {
       // Work may have been completed due to clearing of stuck commits.
-      LOG.warn("Unable to complete inactive work for key {} and token {}.", 
shardedKey, workToken);
+      LOG.warn("Unable to complete inactive work for key {} and token {}.", 
shardedKey, workId);

Review Comment:
   nit: replace token with ids



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -108,10 +130,26 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
     }
 
     // Ensure we don't already have this work token queued.
-    for (Work queuedWork : workQueue) {
-      if (queuedWork.getWorkItem().getWorkToken() == 
work.getWorkItem().getWorkToken()) {
+    Iterator<Work> workQueueIterator = workQueue.iterator();
+    while (workQueueIterator.hasNext()) {
+      Work queuedWork = workQueueIterator.next();
+
+      // Work tokens and cache tokens are equal.
+      if (queuedWork.id().equals(work.id())) {
+        if (work.newerThan(queuedWork)) {
+          workQueueIterator.remove();
+          workQueue.addLast(work);
+          return ActivateWorkResult.QUEUED;
+        }
         return ActivateWorkResult.DUPLICATE;
       }
+
+      // Retries have the same work token, but different cache tokens.
+      if (work.isRetryOf(queuedWork)) {
+        workQueueIterator.remove();

Review Comment:
   for these cases we're removing, I'm not sure we should remove if the 
iterator is the first in the queue. In that case it is the active item, 
removing it isn't actually terminating the active work (we may add support for 
this in the future in which case we can improve handling here).  And removing 
it will just lead to confusing logs below about tokens mismatching when the 
active finishes.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -125,6 +131,18 @@ boolean isStuckCommittingAt(Instant stuckCommitDeadline) {
         && currentState.startTime().isBefore(stuckCommitDeadline);
   }
 
+  public WorkId id() {
+    return id;
+  }
+
+  boolean isRetryOf(Work other) {
+    return id.isRetryOf(other.id);
+  }
+
+  boolean newerThan(Work other) {

Review Comment:
   see other comment, I was thinking of this as a WorkId method 
   
   boolean knownObsoletedBy(WorkId other) {
     return other.cacheToken == this.cacheToken && workToken < other.cacheToken;
   }
   
   in that case I think the forwarding Work.isRetryOf methods could be removed 
and just use id().isRetryOf



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -178,21 +219,21 @@ private synchronized Optional<Work> 
getNextWork(Queue<Work> workQueue, ShardedKe
    * before the stuckCommitDeadline.
    */
   synchronized void invalidateStuckCommits(
-      Instant stuckCommitDeadline, BiConsumer<ShardedKey, Long> 
shardedKeyAndWorkTokenConsumer) {
-    for (Entry<ShardedKey, Long> shardedKeyAndWorkToken :
+      Instant stuckCommitDeadline, BiConsumer<ShardedKey, WorkId> 
shardedKeyAndWorkIdConsumer) {
+    for (Entry<ShardedKey, WorkId> shardedKeyAndWorkToken :

Review Comment:
   nit: update variable name to id



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -126,37 +164,40 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
    * #activeWork}.
    */
   synchronized Optional<Work> completeWorkAndGetNextWorkForKey(
-      ShardedKey shardedKey, long workToken) {
+      ShardedKey shardedKey, WorkId workId) {
     @Nullable Queue<Work> workQueue = activeWork.get(shardedKey);
     if (workQueue == null) {
       // Work may have been completed due to clearing of stuck commits.
-      LOG.warn("Unable to complete inactive work for key {} and token {}.", 
shardedKey, workToken);
+      LOG.warn("Unable to complete inactive work for key {} and token {}.", 
shardedKey, workId);
       return Optional.empty();
     }
-    removeCompletedWorkFromQueue(workQueue, shardedKey, workToken);
+    removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
     return getNextWork(workQueue, shardedKey);
   }
 
   private synchronized void removeCompletedWorkFromQueue(
-      Queue<Work> workQueue, ShardedKey shardedKey, long workToken) {
+      Queue<Work> workQueue, ShardedKey shardedKey, WorkId workId) {
     // avoid Preconditions.checkState here to prevent eagerly evaluating the
     // format string parameters for the error message.
-    Work completedWork =
+    Work workToComplete =

Review Comment:
   nit: I think the previous name was better since the work has already 
completed user processing.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * A composite key used to identify a unit of {@link Work}. If multiple units 
of {@link Work} have
+ * the same workToken AND cacheToken, the {@link Work} is a duplicate. If 
multiple units of {@link
+ * Work} have the same workToken, but different cacheTokens, the {@link Work} 
is a retry.

Review Comment:
   can document the cacheToken same, workToken different as known obsolete in 
comment



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * A composite key used to identify a unit of {@link Work}. If multiple units 
of {@link Work} have
+ * the same workToken AND cacheToken, the {@link Work} is a duplicate. If 
multiple units of {@link
+ * Work} have the same workToken, but different cacheTokens, the {@link Work} 
is a retry.
+ */
+@AutoValue
+public abstract class WorkId {
+
+  public static Builder builder() {
+    return new AutoValue_WorkId.Builder();
+  }
+
+  abstract long cacheToken();
+
+  abstract long workToken();
+
+  boolean isRetryOf(WorkId other) {
+    return other.workToken() == workToken() && other.cacheToken() != 
cacheToken();
+  }
+
+  boolean isForSameWork(WorkId other) {

Review Comment:
   remove? seems like accessors can just be called
   
   also we have some weird cases (backend truncation of work item) where the 
work token can be the same but the work is not entirely the same if the cache 
token is changed so the method name is perhaps confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to