This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 24efe7619d9 Track windmill current active work budget. (#30048)
24efe7619d9 is described below
commit 24efe7619d9a0e5fae70a6d69beabb2d23f362b1
Author: martin trieu <[email protected]>
AuthorDate: Thu Feb 8 02:00:15 2024 -0800
Track windmill current active work budget. (#30048)
---
.../dataflow/worker/StreamingDataflowWorker.java | 6 +-
.../dataflow/worker/streaming/ActiveWorkState.java | 79 +++++++++++++++-----
.../runners/dataflow/worker/streaming/Work.java | 2 -
.../worker/streaming/ActiveWorkStateTest.java | 83 +++++++++++++++++++---
4 files changed, 142 insertions(+), 28 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 463ab953fae..e8ca3a2834f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1970,8 +1970,10 @@ public class StreamingDataflowWorker {
failedWork
.computeIfAbsent(heartbeatResponse.getShardingKey(), key -> new
ArrayList<>())
.add(
- new FailedTokens(
- heartbeatResponse.getWorkToken(),
heartbeatResponse.getCacheToken()));
+ FailedTokens.newBuilder()
+ .setWorkToken(heartbeatResponse.getWorkToken())
+ .setCacheToken(heartbeatResponse.getCacheToken())
+ .build());
}
}
ComputationState state =
computationMap.get(computationHeartbeatResponse.getComputationId());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
index ff46356d956..b4b46932393 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker.streaming;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import com.google.auto.value.AutoValue;
import java.io.PrintWriter;
import java.util.ArrayDeque;
import java.util.Deque;
@@ -28,6 +29,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import javax.annotation.Nullable;
@@ -38,6 +40,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.apache.beam.sdk.annotations.Internal;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
@@ -70,11 +73,19 @@ public final class ActiveWorkState {
@GuardedBy("this")
private final WindmillStateCache.ForComputation computationStateCache;
+ /**
+ * Current budget that is being processed or queued on the user worker.
Incremented when work is
+ * activated in {@link #activateWorkForKey(ShardedKey, Work)}, and
decremented when work is
+ * completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, long)}.
+ */
+ private final AtomicReference<GetWorkBudget> activeGetWorkBudget;
+
private ActiveWorkState(
Map<ShardedKey, Deque<Work>> activeWork,
WindmillStateCache.ForComputation computationStateCache) {
this.activeWork = activeWork;
this.computationStateCache = computationStateCache;
+ this.activeGetWorkBudget = new AtomicReference<>(GetWorkBudget.noBudget());
}
static ActiveWorkState create(WindmillStateCache.ForComputation
computationStateCache) {
@@ -88,6 +99,12 @@ public final class ActiveWorkState {
return new ActiveWorkState(activeWork, computationStateCache);
}
+ private static String elapsedString(Instant start, Instant end) {
+ Duration activeFor = new Duration(start, end);
+ // Duration's toString always starts with "PT"; remove that here.
+ return activeFor.toString().substring(2);
+ }
+
/**
* Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 3
{@link
* ActivateWorkResult}
@@ -103,12 +120,12 @@ public final class ActiveWorkState {
*/
synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey,
Work work) {
Deque<Work> workQueue = activeWork.getOrDefault(shardedKey, new
ArrayDeque<>());
-
// This key does not have any work queued up on it. Create one, insert
Work, and mark the work
// to be executed.
if (!activeWork.containsKey(shardedKey) || workQueue.isEmpty()) {
workQueue.addLast(work);
activeWork.put(shardedKey, workQueue);
+ incrementActiveWorkBudget(work);
return ActivateWorkResult.EXECUTE;
}
@@ -121,16 +138,27 @@ public final class ActiveWorkState {
// Queue the work for later processing.
workQueue.addLast(work);
+ incrementActiveWorkBudget(work);
return ActivateWorkResult.QUEUED;
}
- public static final class FailedTokens {
- public long workToken;
- public long cacheToken;
+ @AutoValue
+ public abstract static class FailedTokens {
+ public static Builder newBuilder() {
+ return new AutoValue_ActiveWorkState_FailedTokens.Builder();
+ }
+
+ public abstract long workToken();
+
+ public abstract long cacheToken();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setWorkToken(long value);
- public FailedTokens(long workToken, long cacheToken) {
- this.workToken = workToken;
- this.cacheToken = cacheToken;
+ public abstract Builder setCacheToken(long value);
+
+ public abstract FailedTokens build();
}
}
@@ -148,17 +176,17 @@ public final class ActiveWorkState {
for (FailedTokens failedToken : failedTokens) {
for (Work queuedWork : entry.getValue()) {
WorkItem workItem = queuedWork.getWorkItem();
- if (workItem.getWorkToken() == failedToken.workToken
- && workItem.getCacheToken() == failedToken.cacheToken) {
+ if (workItem.getWorkToken() == failedToken.workToken()
+ && workItem.getCacheToken() == failedToken.cacheToken()) {
LOG.debug(
"Failing work "
+ computationStateCache.getComputation()
+ " "
+ entry.getKey().shardingKey()
+ " "
- + failedToken.workToken
+ + failedToken.workToken()
+ " "
- + failedToken.cacheToken
+ + failedToken.cacheToken()
+ ". The work will be retried and is not lost.");
queuedWork.setFailed();
break;
@@ -168,6 +196,16 @@ public final class ActiveWorkState {
}
}
+ private void incrementActiveWorkBudget(Work work) {
+ activeGetWorkBudget.updateAndGet(
+ getWorkBudget -> getWorkBudget.apply(1,
work.getWorkItem().getSerializedSize()));
+ }
+
+ private void decrementActiveWorkBudget(Work work) {
+ activeGetWorkBudget.updateAndGet(
+ getWorkBudget -> getWorkBudget.subtract(1,
work.getWorkItem().getSerializedSize()));
+ }
+
/**
* Removes the complete work from the {@link Queue<Work>}. The {@link Work}
is marked as completed
* if its workToken matches the one that is passed in. Returns the next
{@link Work} in the {@link
@@ -208,6 +246,7 @@ public final class ActiveWorkState {
// We consumed the matching work item.
workQueue.remove();
+ decrementActiveWorkBudget(completedWork);
}
private synchronized Optional<Work> getNextWork(Queue<Work> workQueue,
ShardedKey shardedKey) {
@@ -285,6 +324,15 @@ public final class ActiveWorkState {
.build());
}
+ /**
+ * Returns the current aggregate {@link GetWorkBudget} that is active on the
user worker. Active
+ * means that the work is received from Windmill, being processed or queued
to be processed in
+ * {@link ActiveWorkState}, and not committed back to Windmill.
+ */
+ GetWorkBudget currentActiveWorkBudget() {
+ return activeGetWorkBudget.get();
+ }
+
synchronized void printActiveWork(PrintWriter writer, Instant now) {
writer.println(
"<table border=\"1\" "
@@ -328,12 +376,11 @@ public final class ActiveWorkState {
writer.println(commitsPendingCount - MAX_PRINTABLE_COMMIT_PENDING_KEYS);
writer.println("<br>");
}
- }
- private static String elapsedString(Instant start, Instant end) {
- Duration activeFor = new Duration(start, end);
- // Duration's toString always starts with "PT"; remove that here.
- return activeFor.toString().substring(2);
+ writer.println("<br>");
+ writer.println("Current Active Work Budget: ");
+ writer.println(currentActiveWorkBudget());
+ writer.println("<br>");
}
enum ActivateWorkResult {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index 8d4ba33a1ab..6c85c615af1 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -42,14 +42,12 @@ import org.joda.time.Instant;
@NotThreadSafe
public class Work implements Runnable {
-
private final Windmill.WorkItem workItem;
private final Supplier<Instant> clock;
private final Instant startTime;
private final Map<Windmill.LatencyAttribution.State, Duration>
totalDurationPerState;
private final Consumer<Work> processWorkFn;
private TimedState currentState;
-
private volatile boolean isFailed;
private Work(Windmill.WorkItem workItem, Supplier<Instant> clock,
Consumer<Work> processWorkFn) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
index b384bb03185..82ff24c03bb 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
@@ -32,12 +32,12 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import
org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.ActivateWorkResult;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
@@ -50,9 +50,9 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class ActiveWorkStateTest {
- @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
private final WindmillStateCache.ForComputation computationStateCache =
mock(WindmillStateCache.ForComputation.class);
+ @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
private Map<ShardedKey, Deque<Work>> readOnlyActiveWork;
private ActiveWorkState activeWorkState;
@@ -61,11 +61,7 @@ public class ActiveWorkStateTest {
return ShardedKey.create(ByteString.copyFromUtf8(str), shardKey);
}
- private static Work emptyWork() {
- return createWork(null);
- }
-
- private static Work createWork(@Nullable Windmill.WorkItem workItem) {
+ private static Work createWork(Windmill.WorkItem workItem) {
return Work.create(workItem, Instant::now, Collections.emptyList(), unused
-> {});
}
@@ -92,7 +88,8 @@ public class ActiveWorkStateTest {
@Test
public void testActivateWorkForKey_EXECUTE_unknownKey() {
ActivateWorkResult activateWorkResult =
- activeWorkState.activateWorkForKey(shardedKey("someKey", 1L),
emptyWork());
+ activeWorkState.activateWorkForKey(
+ shardedKey("someKey", 1L), createWork(createWorkItem(1L)));
assertEquals(ActivateWorkResult.EXECUTE, activateWorkResult);
}
@@ -214,6 +211,76 @@ public class ActiveWorkStateTest {
assertFalse(readOnlyActiveWork.containsKey(shardedKey));
}
+ @Test
+ public void
testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_oneShardKey() {
+ ShardedKey shardedKey = shardedKey("someKey", 1L);
+ Work work1 = createWork(createWorkItem(1L));
+ Work work2 = createWork(createWorkItem(2L));
+
+ activeWorkState.activateWorkForKey(shardedKey, work1);
+ activeWorkState.activateWorkForKey(shardedKey, work2);
+
+ GetWorkBudget expectedActiveBudget1 =
+ GetWorkBudget.builder()
+ .setItems(2)
+ .setBytes(
+ work1.getWorkItem().getSerializedSize() +
work2.getWorkItem().getSerializedSize())
+ .build();
+
+
assertThat(activeWorkState.currentActiveWorkBudget()).isEqualTo(expectedActiveBudget1);
+
+ activeWorkState.completeWorkAndGetNextWorkForKey(
+ shardedKey, work1.getWorkItem().getWorkToken());
+
+ GetWorkBudget expectedActiveBudget2 =
+ GetWorkBudget.builder()
+ .setItems(1)
+ .setBytes(work1.getWorkItem().getSerializedSize())
+ .build();
+
+
assertThat(activeWorkState.currentActiveWorkBudget()).isEqualTo(expectedActiveBudget2);
+ }
+
+ @Test
+ public void
testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_whenWorkCompleted()
{
+ ShardedKey shardedKey = shardedKey("someKey", 1L);
+ Work work1 = createWork(createWorkItem(1L));
+ Work work2 = createWork(createWorkItem(2L));
+
+ activeWorkState.activateWorkForKey(shardedKey, work1);
+ activeWorkState.activateWorkForKey(shardedKey, work2);
+ activeWorkState.completeWorkAndGetNextWorkForKey(
+ shardedKey, work1.getWorkItem().getWorkToken());
+
+ GetWorkBudget expectedActiveBudget =
+ GetWorkBudget.builder()
+ .setItems(1)
+ .setBytes(work1.getWorkItem().getSerializedSize())
+ .build();
+
+
assertThat(activeWorkState.currentActiveWorkBudget()).isEqualTo(expectedActiveBudget);
+ }
+
+ @Test
+ public void
testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_multipleShardKeys()
{
+ ShardedKey shardedKey1 = shardedKey("someKey", 1L);
+ ShardedKey shardedKey2 = shardedKey("someKey", 2L);
+ Work work1 = createWork(createWorkItem(1L));
+ Work work2 = createWork(createWorkItem(2L));
+
+ activeWorkState.activateWorkForKey(shardedKey1, work1);
+ activeWorkState.activateWorkForKey(shardedKey2, work2);
+
+ GetWorkBudget expectedActiveBudget =
+ GetWorkBudget.builder()
+ .setItems(2)
+ .setBytes(
+ work1.getWorkItem().getSerializedSize() +
work2.getWorkItem().getSerializedSize())
+ .build();
+
+
assertThat(activeWorkState.currentActiveWorkBudget()).isEqualTo(expectedActiveBudget);
+ }
+
@Test
public void testInvalidateStuckCommits() {
Map<ShardedKey, Long> invalidatedCommits = new HashMap<>();