This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 80cba5644c3 plumb backend worker token to work items (#32777)
80cba5644c3 is described below
commit 80cba5644c338599144c3b227bc1db6b10a039f0
Author: martin trieu <[email protected]>
AuthorDate: Tue Oct 15 04:44:20 2024 -0700
plumb backend worker token to work items (#32777)
---
.../dataflow/worker/streaming/ActiveWorkState.java | 24 +++++++++++++++----
.../runners/dataflow/worker/streaming/Work.java | 28 ++++++++++++++++++----
.../client/grpc/GrpcDirectGetWorkStream.java | 6 ++++-
3 files changed, 47 insertions(+), 11 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
index 4607096dd66..aec52cd7d9a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
@@ -240,18 +240,20 @@ public final class ActiveWorkState {
@Nullable Queue<ExecutableWork> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
- LOG.warn("Unable to complete inactive work for key {} and token {}.",
shardedKey, workId);
+ LOG.warn(
+ "Unable to complete inactive work for key={} and token={}. Work
queue for key does not exist.",
+ shardedKey,
+ workId);
return Optional.empty();
}
+
removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
return getNextWork(workQueue, shardedKey);
}
private synchronized void removeCompletedWorkFromQueue(
Queue<ExecutableWork> workQueue, ShardedKey shardedKey, WorkId workId) {
- // avoid Preconditions.checkState here to prevent eagerly evaluating the
- // format string parameters for the error message.
- ExecutableWork completedWork = workQueue.peek();
+ @Nullable ExecutableWork completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Active key {} without work, expected token {}", shardedKey,
workId);
@@ -337,8 +339,18 @@ public final class ActiveWorkState {
writer.println(
"<table border=\"1\" "
+
"style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
+ // Columns.
writer.println(
- "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active
For</th><th>State</th><th>State Active For</th><th>Processing
Thread</th></tr>");
+ "<tr>"
+ + "<th>Key</th>"
+ + "<th>Token</th>"
+ + "<th>Queued</th>"
+ + "<th>Active For</th>"
+ + "<th>State</th>"
+ + "<th>State Active For</th>"
+ + "<th>Processing Thread</th>"
+ + "<th>Backend</th>"
+ + "</tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
int commitsPendingCount = 0;
@@ -366,6 +378,8 @@ public final class ActiveWorkState {
activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(),
now));
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(activeWork.getProcessingThreadName());
+ activeWorkStatus.append("</td><td>");
+ activeWorkStatus.append(activeWork.backendWorkerToken());
activeWorkStatus.append("</td></tr>\n");
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index 03d1e1ae469..6f97cbca9a8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -56,7 +56,7 @@ import org.joda.time.Instant;
/**
* Represents the state of an attempt to process a {@link WorkItem} by
executing user code.
*
- * @implNote Not thread safe, should not be executed or accessed by more than
1 thread at a time.
+ * @implNote Not thread safe, should not be modified by more than 1 thread at
a time.
*/
@NotThreadSafe
@Internal
@@ -70,7 +70,7 @@ public final class Work implements RefreshableWork {
private final Map<LatencyAttribution.State, Duration> totalDurationPerState;
private final WorkId id;
private final String latencyTrackingId;
- private TimedState currentState;
+ private volatile TimedState currentState;
private volatile boolean isFailed;
private volatile String processingThreadName = "";
@@ -111,7 +111,18 @@ public final class Work implements RefreshableWork {
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
- return ProcessingContext.create(computationId, getDataClient,
workCommitter, heartbeatSender);
+ return ProcessingContext.create(
+ computationId, getDataClient, workCommitter, heartbeatSender, /*
backendWorkerToken= */ "");
+ }
+
+ public static ProcessingContext createProcessingContext(
+ String computationId,
+ GetDataClient getDataClient,
+ Consumer<Commit> workCommitter,
+ HeartbeatSender heartbeatSender,
+ String backendWorkerToken) {
+ return ProcessingContext.create(
+ computationId, getDataClient, workCommitter, heartbeatSender,
backendWorkerToken);
}
private static LatencyAttribution.Builder
createLatencyAttributionWithActiveLatencyBreakdown(
@@ -168,6 +179,10 @@ public final class Work implements RefreshableWork {
return processingContext.getDataClient().getSideInputData(request);
}
+ public String backendWorkerToken() {
+ return processingContext.backendWorkerToken();
+ }
+
public Watermarks watermarks() {
return watermarks;
}
@@ -351,9 +366,10 @@ public final class Work implements RefreshableWork {
String computationId,
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
- HeartbeatSender heartbeatSender) {
+ HeartbeatSender heartbeatSender,
+ String backendWorkerToken) {
return new AutoValue_Work_ProcessingContext(
- computationId, getDataClient, heartbeatSender, workCommitter);
+ computationId, getDataClient, heartbeatSender, workCommitter,
backendWorkerToken);
}
/** Computation that the {@link Work} belongs to. */
@@ -370,6 +386,8 @@ public final class Work implements RefreshableWork {
*/
public abstract Consumer<Commit> workCommitter();
+ public abstract String backendWorkerToken();
+
private Optional<KeyedGetDataResponse> fetchKeyedState(KeyedGetDataRequest
request) {
return Optional.ofNullable(getDataClient().getStateData(computationId(),
request));
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
index 45d010d7cfa..19de998b1da 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
@@ -254,7 +254,11 @@ public final class GrpcDirectGetWorkStream
private Work.ProcessingContext createProcessingContext(String computationId)
{
return Work.createProcessingContext(
- computationId, getDataClient.get(), workCommitter.get()::commit,
heartbeatSender.get());
+ computationId,
+ getDataClient.get(),
+ workCommitter.get()::commit,
+ heartbeatSender.get(),
+ backendWorkerToken());
}
@Override