scwhittle commented on code in PR #30048:
URL: https://github.com/apache/beam/pull/30048#discussion_r1469562154
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -70,11 +73,21 @@ public final class ActiveWorkState {
@GuardedBy("this")
private final WindmillStateCache.ForComputation computationStateCache;
+ /**
+ * Current budget that is being processed or queued on the user worker.
Incremented when work is
+ * activated in {@link #activateWorkForKey(ShardedKey, Work)}, and
decremented when work is
+ * completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, long)}.
+ *
+ * @implNote Reads are lock free using {@link AtomicReference#get()}, writes
are synchronized.
Review Comment:
an alternative is to use AtomicReference.updateAndGet if you aren't going to
be synchronizing with more on writes
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -168,6 +198,18 @@ synchronized void failWorkForKey(Map<Long,
List<FailedTokens>> failedWork) {
}
}
+ private synchronized void incrementActiveWorkBudget(Work work) {
+ GetWorkBudget currentActiveWorkBudget = activeGetWorkBudget.get();
+ activeGetWorkBudget.set(
+ currentActiveWorkBudget.apply(1,
work.getWorkItem().getSerializedSize()));
+ }
+
+ private synchronized void decrementActiveWorkBudget(Work work) {
Review Comment:
should we have some guards against going negative?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -288,6 +331,15 @@ private static Stream<HeartbeatRequest>
toHeartbeatRequestStream(
.build());
}
+ /**
+ * Returns the current aggregate {@link GetWorkBudget} that is active on the
user worker. Active
+ * means that the work is received from Windmill, being processed or queued
to be processed in
+ * {@link ActiveWorkState}, and not committed back to Windmill.
+ */
+ GetWorkBudget currentActiveWorkBudget() {
+ return activeGetWorkBudget.get();
+ }
+
synchronized void printActiveWork(PrintWriter writer, Instant now) {
Review Comment:
can you add to the status page?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -121,16 +140,27 @@ synchronized ActivateWorkResult
activateWorkForKey(ShardedKey shardedKey, Work w
// Queue the work for later processing.
workQueue.addLast(work);
+ incrementActiveWorkBudget(work);
return ActivateWorkResult.QUEUED;
}
- public static final class FailedTokens {
- public long workToken;
- public long cacheToken;
+ @AutoValue
+ public abstract static class FailedTokens {
+ public abstract long workToken();
+
+ public abstract long cacheToken();
+
+ public static Builder newBuilder() {
+ return new AutoValue_ActiveWorkState_FailedTokens.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setWorkToken(long value);
- public FailedTokens(long workToken, long cacheToken) {
- this.workToken = workToken;
- this.cacheToken = cacheToken;
+ public abstract Builder setCacheToken(long value);
+
+ public abstract FailedTokens build();
Review Comment:
indent looks off
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]