scwhittle commented on code in PR #29156:
URL: https://github.com/apache/beam/pull/29156#discussion_r1374232497


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/streams/grpc/GrpcWindmillServer.java:
##########
@@ -0,0 +1,358 @@
+/*

Review Comment:
   Can the git metadata be updated so this is a move from it's previous 
location instead of separate delete/add?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/streams/observers/DirectStreamObserver.java:
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.streams.observers;

Review Comment:
   StreamObserver/CallStreamObserver are grpc specific, shoudl they just be in 
grpc package and remove observers package?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/util/StreamingEngineThrottleTimers.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   Would rather avoid a generic util package.  It seems like most of these 
things in util are grpc specific. Perhaps they could go in the streams.grpc 
package instead?
   
   If you were thinking they were not stream related, I wonder if we should 
rename stream package to stub or client etc showing it is the windmill rpc 
client code and put both the channel stuff and stream stuff in their.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/util/ThrottleTimer.java:
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+package org.apache.beam.runners.dataflow.worker.windmill.util;

Review Comment:
   ditto in streams? not used otherwise



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/streams/observers/StreamObserverFactory.java:
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.streams.observers;

Review Comment:
   move to grpc?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/streams/grpc/GrpcWindmillStreamFactory.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.streams.grpc;
+
+import static 
org.apache.beam.runners.dataflow.worker.windmill.streams.AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS;
+
+import com.google.common.base.Suppliers;
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
+import 
org.apache.beam.runners.dataflow.worker.windmill.streams.AbstractWindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.streams.WindmillStream.CommitWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.streams.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.streams.WindmillStream.GetWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.streams.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.streams.observers.StreamObserverFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.util.ThrottleTimer;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+public class GrpcWindmillStreamFactory implements StatusDataProvider {
+  private static final Duration MIN_BACKOFF = Duration.millis(1);
+  private static final Duration DEFAULT_MAX_BACKOFF = 
Duration.standardSeconds(30);
+  private static final int DEFAULT_LOG_EVERY_N_STREAM_FAILURES = 1;
+  private static final int DEFAULT_STREAMING_RPC_BATCH_LIMIT = 
Integer.MAX_VALUE;
+  private static final int DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS = 
1;
+
+  private final JobHeader jobHeader;
+  private final int logEveryNStreamFailures;
+  private final int streamingRpcBatchLimit;
+  private final int windmillMessagesBetweenIsReadyChecks;
+  private final Supplier<BackOff> grpcBackOff;
+  private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
+  private final AtomicLong streamIdGenerator;
+
+  private GrpcWindmillStreamFactory(
+      JobHeader jobHeader,
+      int logEveryNStreamFailures,
+      int streamingRpcBatchLimit,
+      int windmillMessagesBetweenIsReadyChecks,
+      Supplier<Duration> maxBackOff) {
+    this.jobHeader = jobHeader;
+    this.logEveryNStreamFailures = logEveryNStreamFailures;
+    this.streamingRpcBatchLimit = streamingRpcBatchLimit;
+    this.windmillMessagesBetweenIsReadyChecks = 
windmillMessagesBetweenIsReadyChecks;
+    // Configure backoff to retry calls forever, with a maximum sane retry 
interval.
+    this.grpcBackOff =
+        Suppliers.memoize(
+            () ->
+                FluentBackoff.DEFAULT
+                    .withInitialBackoff(MIN_BACKOFF)
+                    .withMaxBackoff(maxBackOff.get())
+                    .backoff());
+    this.streamRegistry = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    this.streamIdGenerator = new AtomicLong();
+  }
+
+  private static CloudWindmillServiceV1Alpha1Stub withDeadline(
+      CloudWindmillServiceV1Alpha1Stub stub) {
+    // Deadlines are absolute points in time, so generate a new one everytime 
this function is
+    // called.
+    return stub.withDeadlineAfter(
+        AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, 
TimeUnit.SECONDS);
+  }
+
+  static GrpcWindmillStreamFactory.Builder of(JobHeader jobHeader) {
+    return new GrpcWindmillStreamFactory.Builder(jobHeader);
+  }
+
+  public GetWorkStream createGetWorkStream(
+      CloudWindmillServiceV1Alpha1Stub stub,
+      GetWorkRequest request,
+      ThrottleTimer getWorkThrottleTimer,
+      WorkItemReceiver processWorkItem) {
+    return GrpcGetWorkStream.create(
+        responseObserver -> withDeadline(stub).getWorkStream(responseObserver),
+        request,
+        grpcBackOff.get(),
+        newStreamObserverFactory(),
+        streamRegistry,
+        logEveryNStreamFailures,
+        getWorkThrottleTimer,
+        processWorkItem);
+  }
+
+  public GetDataStream createGetDataStream(
+      CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer 
getDataThrottleTimer) {
+    return GrpcGetDataStream.create(
+        responseObserver -> withDeadline(stub).getDataStream(responseObserver),
+        grpcBackOff.get(),
+        newStreamObserverFactory(),
+        streamRegistry,
+        logEveryNStreamFailures,
+        getDataThrottleTimer,
+        jobHeader,
+        streamIdGenerator,
+        streamingRpcBatchLimit);
+  }
+
+  public CommitWorkStream createCommitWorkStream(
+      CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer 
commitWorkThrottleTimer) {
+    return GrpcCommitWorkStream.create(
+        responseObserver -> 
withDeadline(stub).commitWorkStream(responseObserver),
+        grpcBackOff.get(),
+        newStreamObserverFactory(),
+        streamRegistry,
+        logEveryNStreamFailures,
+        commitWorkThrottleTimer,
+        jobHeader,
+        streamIdGenerator,
+        streamingRpcBatchLimit);
+  }
+
+  public GetWorkerMetadataStream createGetWorkerMetadataStream(
+      CloudWindmillServiceV1Alpha1Stub stub,
+      ThrottleTimer getWorkerMetadataThrottleTimer,
+      Consumer<WindmillEndpoints> onNewWindmillEndpoints) {
+    return GrpcGetWorkerMetadataStream.create(
+        responseObserver -> 
withDeadline(stub).getWorkerMetadataStream(responseObserver),
+        grpcBackOff.get(),
+        newStreamObserverFactory(),
+        streamRegistry,
+        logEveryNStreamFailures,
+        jobHeader,
+        0,
+        getWorkerMetadataThrottleTimer,
+        onNewWindmillEndpoints);
+  }
+
+  private StreamObserverFactory newStreamObserverFactory() {
+    return StreamObserverFactory.direct(
+        DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, 
windmillMessagesBetweenIsReadyChecks);
+  }
+
+  /**
+   * Schedules streaming RPC health checks to run on a background daemon 
thread, which will be
+   * cleaned up when the JVM shutdown.
+   */
+  private void scheduleHealthChecks(int healthCheckInterval) {
+    new Timer("WindmillHealthCheckTimer")
+        .schedule(
+            new TimerTask() {
+              @Override
+              public void run() {
+                Instant reportThreshold = 
Instant.now().minus(Duration.millis(healthCheckInterval));
+                for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
+                  stream.maybeSendHealthCheck(reportThreshold);
+                }
+              }
+            },
+            0,
+            healthCheckInterval);
+  }
+
+  @Override
+  public void appendSummaryHtml(PrintWriter writer) {
+    writer.write("Active Streams:<br>");
+    for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
+      stream.appendSummaryHtml(writer);
+      writer.write("<br>");
+    }
+  }
+
+  static class Builder {

Review Comment:
   could use @ AutoValue.Builder perhaps to reduce boilerplate



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
+
+import com.google.auto.value.AutoValue;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.streams.WindmillStream;
+
+/**
+ * Budget of items and bytes for fetching {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) via 
{@link
+ * WindmillStream.GetWorkStream}. Used to control how "much" work is returned 
from Windmill.
+ */
+@AutoValue
+public abstract class GetWorkBudget {
+  public static GetWorkBudget.Builder builder() {
+    return new AutoValue_GetWorkBudget.Builder();
+  }
+
+  /** {@link GetWorkBudget} of 0. */
+  public static GetWorkBudget noBudget() {
+    return builder().setItems(0).setBytes(0).build();
+  }
+
+  public static GetWorkBudget from(GetWorkRequest getWorkRequest) {
+    return builder()
+        .setItems(getWorkRequest.getMaxItems())
+        .setBytes(getWorkRequest.getMaxBytes())
+        .build();
+  }
+
+  /**
+   * Adds the given bytes and items or the current budget, returning a new 
{@link GetWorkBudget}.
+   * Does not drop below 0.
+   */
+  public GetWorkBudget add(long items, long bytes) {
+    return GetWorkBudget.builder()
+        .setBytes(Math.max(0, bytes() + bytes))

Review Comment:
   build takes care of max, remove here and in subtract?
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
+
+import com.google.auto.value.AutoValue;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.streams.WindmillStream;
+
+/**
+ * Budget of items and bytes for fetching {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) via 
{@link
+ * WindmillStream.GetWorkStream}. Used to control how "much" work is returned 
from Windmill.
+ */
+@AutoValue
+public abstract class GetWorkBudget {
+  public static GetWorkBudget.Builder builder() {
+    return new AutoValue_GetWorkBudget.Builder();
+  }
+
+  /** {@link GetWorkBudget} of 0. */
+  public static GetWorkBudget noBudget() {
+    return builder().setItems(0).setBytes(0).build();
+  }
+
+  public static GetWorkBudget from(GetWorkRequest getWorkRequest) {
+    return builder()
+        .setItems(getWorkRequest.getMaxItems())
+        .setBytes(getWorkRequest.getMaxBytes())
+        .build();
+  }
+
+  /**
+   * Adds the given bytes and items or the current budget, returning a new 
{@link GetWorkBudget}.
+   * Does not drop below 0.
+   */
+  public GetWorkBudget add(long items, long bytes) {
+    return GetWorkBudget.builder()
+        .setBytes(Math.max(0, bytes() + bytes))
+        .setItems(Math.max(0, items() + items))
+        .build();
+  }
+
+  public GetWorkBudget add(GetWorkBudget other) {
+    return add(other.items(), other.bytes());
+  }
+
+  /**
+   * Subtracts the given bytes and items or the current budget, returning a 
new {@link
+   * GetWorkBudget}. Does not drop below 0.
+   */
+  public GetWorkBudget subtract(long items, long bytes) {
+    return GetWorkBudget.builder()
+        .setBytes(Math.max(0, bytes() - bytes))
+        .setItems(Math.max(0, items() - items))
+        .build();
+  }
+
+  public GetWorkBudget subtract(GetWorkBudget other) {
+    return subtract(other.items(), other.bytes());
+  }
+
+  public boolean isLessThanFiftyPercentOf(GetWorkBudget target) {

Review Comment:
   rm, this seems too specific for class method to me. It can just be a helper 
function whatever wants to do this.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
+
+import com.google.auto.value.AutoValue;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.streams.WindmillStream;
+
+/**
+ * Budget of items and bytes for fetching {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) via 
{@link
+ * WindmillStream.GetWorkStream}. Used to control how "much" work is returned 
from Windmill.
+ */
+@AutoValue
+public abstract class GetWorkBudget {
+  public static GetWorkBudget.Builder builder() {
+    return new AutoValue_GetWorkBudget.Builder();
+  }
+
+  /** {@link GetWorkBudget} of 0. */
+  public static GetWorkBudget noBudget() {
+    return builder().setItems(0).setBytes(0).build();
+  }
+
+  public static GetWorkBudget from(GetWorkRequest getWorkRequest) {
+    return builder()
+        .setItems(getWorkRequest.getMaxItems())
+        .setBytes(getWorkRequest.getMaxBytes())
+        .build();
+  }
+
+  /**
+   * Adds the given bytes and items or the current budget, returning a new 
{@link GetWorkBudget}.
+   * Does not drop below 0.
+   */
+  public GetWorkBudget add(long items, long bytes) {
+    return GetWorkBudget.builder()
+        .setBytes(Math.max(0, bytes() + bytes))
+        .setItems(Math.max(0, items() + items))
+        .build();
+  }
+
+  public GetWorkBudget add(GetWorkBudget other) {
+    return add(other.items(), other.bytes());
+  }
+
+  /**
+   * Subtracts the given bytes and items or the current budget, returning a 
new {@link
+   * GetWorkBudget}. Does not drop below 0.
+   */
+  public GetWorkBudget subtract(long items, long bytes) {

Review Comment:
   enforce items/bytes non-negative?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2843,7 +2843,7 @@ public void testActiveThreadMetric() throws Exception {
     synchronized (this) {
       executor.execute(m2, m2.getWorkItem().getSerializedSize());
       this.wait();
-      // Seems current executor executes the initial work item twice
+      //      // Seems current executor executes the initial work item twice

Review Comment:
   revert



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/streams/observers/ForwardingClientResponseObserver.java:
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.windmill;
+package org.apache.beam.runners.dataflow.worker.windmill.streams.observers;

Review Comment:
   move to grpc?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/util/VendoredRequestMetadataCallbackAdapter.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.util;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Create a wrapper around credentials callback that delegates to the 
underlying vendored {@link
+ * com.google.auth.RequestMetadataCallback}. Note that this class should 
override every method that
+ * is not final and not static and call the delegate directly.
+ *
+ * <p>TODO: Replace this with an auto generated proxy which calls the 
underlying implementation
+ * delegate to reduce maintenance burden.
+ */
+public class VendoredRequestMetadataCallbackAdapter
+    implements com.google.auth.RequestMetadataCallback {
+
+  private final 
org.apache.beam.vendor.grpc.v1p54p0.com.google.auth.RequestMetadataCallback

Review Comment:
   also put in grpc package?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/util/VendoredCredentialsAdapter.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.util;

Review Comment:
   in streams.grpc? grpc specific



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudget.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
+
+import com.google.auto.value.AutoValue;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.streams.WindmillStream;
+
+/**
+ * Budget of items and bytes for fetching {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) via 
{@link
+ * WindmillStream.GetWorkStream}. Used to control how "much" work is returned 
from Windmill.
+ */
+@AutoValue
+public abstract class GetWorkBudget {
+  public static GetWorkBudget.Builder builder() {
+    return new AutoValue_GetWorkBudget.Builder();
+  }
+
+  /** {@link GetWorkBudget} of 0. */
+  public static GetWorkBudget noBudget() {
+    return builder().setItems(0).setBytes(0).build();
+  }
+
+  public static GetWorkBudget from(GetWorkRequest getWorkRequest) {
+    return builder()
+        .setItems(getWorkRequest.getMaxItems())
+        .setBytes(getWorkRequest.getMaxBytes())
+        .build();
+  }
+
+  /**
+   * Adds the given bytes and items or the current budget, returning a new 
{@link GetWorkBudget}.
+   * Does not drop below 0.
+   */
+  public GetWorkBudget add(long items, long bytes) {

Review Comment:
   enforce items/bytes are non-negative?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to