This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 80cba5644c3 plumb backend worker token to work items (#32777)
80cba5644c3 is described below

commit 80cba5644c338599144c3b227bc1db6b10a039f0
Author: martin trieu <[email protected]>
AuthorDate: Tue Oct 15 04:44:20 2024 -0700

    plumb backend worker token to work items (#32777)
---
 .../dataflow/worker/streaming/ActiveWorkState.java | 24 +++++++++++++++----
 .../runners/dataflow/worker/streaming/Work.java    | 28 ++++++++++++++++++----
 .../client/grpc/GrpcDirectGetWorkStream.java       |  6 ++++-
 3 files changed, 47 insertions(+), 11 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
index 4607096dd66..aec52cd7d9a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
@@ -240,18 +240,20 @@ public final class ActiveWorkState {
     @Nullable Queue<ExecutableWork> workQueue = activeWork.get(shardedKey);
     if (workQueue == null) {
       // Work may have been completed due to clearing of stuck commits.
-      LOG.warn("Unable to complete inactive work for key {} and token {}.", 
shardedKey, workId);
+      LOG.warn(
+          "Unable to complete inactive work for key={} and token={}.  Work 
queue for key does not exist.",
+          shardedKey,
+          workId);
       return Optional.empty();
     }
+
     removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
     return getNextWork(workQueue, shardedKey);
   }
 
   private synchronized void removeCompletedWorkFromQueue(
       Queue<ExecutableWork> workQueue, ShardedKey shardedKey, WorkId workId) {
-    // avoid Preconditions.checkState here to prevent eagerly evaluating the
-    // format string parameters for the error message.
-    ExecutableWork completedWork = workQueue.peek();
+    @Nullable ExecutableWork completedWork = workQueue.peek();
     if (completedWork == null) {
       // Work may have been completed due to clearing of stuck commits.
       LOG.warn("Active key {} without work, expected token {}", shardedKey, 
workId);
@@ -337,8 +339,18 @@ public final class ActiveWorkState {
     writer.println(
         "<table border=\"1\" "
             + 
"style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
+    // Columns.
     writer.println(
-        "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active 
For</th><th>State</th><th>State Active For</th><th>Processing 
Thread</th></tr>");
+        "<tr>"
+            + "<th>Key</th>"
+            + "<th>Token</th>"
+            + "<th>Queued</th>"
+            + "<th>Active For</th>"
+            + "<th>State</th>"
+            + "<th>State Active For</th>"
+            + "<th>Processing Thread</th>"
+            + "<th>Backend</th>"
+            + "</tr>");
     // Use StringBuilder because we are appending in loop.
     StringBuilder activeWorkStatus = new StringBuilder();
     int commitsPendingCount = 0;
@@ -366,6 +378,8 @@ public final class ActiveWorkState {
       activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), 
now));
       activeWorkStatus.append("</td><td>");
       activeWorkStatus.append(activeWork.getProcessingThreadName());
+      activeWorkStatus.append("</td><td>");
+      activeWorkStatus.append(activeWork.backendWorkerToken());
       activeWorkStatus.append("</td></tr>\n");
     }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index 03d1e1ae469..6f97cbca9a8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -56,7 +56,7 @@ import org.joda.time.Instant;
 /**
  * Represents the state of an attempt to process a {@link WorkItem} by 
executing user code.
  *
- * @implNote Not thread safe, should not be executed or accessed by more than 
1 thread at a time.
+ * @implNote Not thread safe, should not be modified by more than 1 thread at 
a time.
  */
 @NotThreadSafe
 @Internal
@@ -70,7 +70,7 @@ public final class Work implements RefreshableWork {
   private final Map<LatencyAttribution.State, Duration> totalDurationPerState;
   private final WorkId id;
   private final String latencyTrackingId;
-  private TimedState currentState;
+  private volatile TimedState currentState;
   private volatile boolean isFailed;
   private volatile String processingThreadName = "";
 
@@ -111,7 +111,18 @@ public final class Work implements RefreshableWork {
       GetDataClient getDataClient,
       Consumer<Commit> workCommitter,
       HeartbeatSender heartbeatSender) {
-    return ProcessingContext.create(computationId, getDataClient, 
workCommitter, heartbeatSender);
+    return ProcessingContext.create(
+        computationId, getDataClient, workCommitter, heartbeatSender, /* 
backendWorkerToken= */ "");
+  }
+
+  public static ProcessingContext createProcessingContext(
+      String computationId,
+      GetDataClient getDataClient,
+      Consumer<Commit> workCommitter,
+      HeartbeatSender heartbeatSender,
+      String backendWorkerToken) {
+    return ProcessingContext.create(
+        computationId, getDataClient, workCommitter, heartbeatSender, 
backendWorkerToken);
   }
 
   private static LatencyAttribution.Builder 
createLatencyAttributionWithActiveLatencyBreakdown(
@@ -168,6 +179,10 @@ public final class Work implements RefreshableWork {
     return processingContext.getDataClient().getSideInputData(request);
   }
 
+  public String backendWorkerToken() {
+    return processingContext.backendWorkerToken();
+  }
+
   public Watermarks watermarks() {
     return watermarks;
   }
@@ -351,9 +366,10 @@ public final class Work implements RefreshableWork {
         String computationId,
         GetDataClient getDataClient,
         Consumer<Commit> workCommitter,
-        HeartbeatSender heartbeatSender) {
+        HeartbeatSender heartbeatSender,
+        String backendWorkerToken) {
       return new AutoValue_Work_ProcessingContext(
-          computationId, getDataClient, heartbeatSender, workCommitter);
+          computationId, getDataClient, heartbeatSender, workCommitter, 
backendWorkerToken);
     }
 
     /** Computation that the {@link Work} belongs to. */
@@ -370,6 +386,8 @@ public final class Work implements RefreshableWork {
      */
     public abstract Consumer<Commit> workCommitter();
 
+    public abstract String backendWorkerToken();
+
     private Optional<KeyedGetDataResponse> fetchKeyedState(KeyedGetDataRequest 
request) {
       return Optional.ofNullable(getDataClient().getStateData(computationId(), 
request));
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
index 45d010d7cfa..19de998b1da 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
@@ -254,7 +254,11 @@ public final class GrpcDirectGetWorkStream
 
   private Work.ProcessingContext createProcessingContext(String computationId) 
{
     return Work.createProcessingContext(
-        computationId, getDataClient.get(), workCommitter.get()::commit, 
heartbeatSender.get());
+        computationId,
+        getDataClient.get(),
+        workCommitter.get()::commit,
+        heartbeatSender.get(),
+        backendWorkerToken());
   }
 
   @Override

Reply via email to